import unittest from ftplib import FTP, error_perm, error_temp import os from io import BytesIO import time import ssl import socket import ftplib import re FTP_HOST = '127.0.0.1' FTP_PORT = 21 FTP_USER = 'username' FTP_PASS = 'password' TEST_FILENAME = 'test.txt' TEST_CONTENT = b'1234567890' # 10 bytes known content DOWNLOAD_FILENAME = 'downloaded_test.txt' UPLOAD_FILENAME = 'upload_test.txt' RESUME_FILENAME = 'resume_test.txt' class FTPServerRFCComplianceTests(unittest.TestCase): def setUp(self): # Create local test file first with open(TEST_FILENAME, 'wb') as f: f.write(TEST_CONTENT) # Connect and login self.ftp = FTP() self.ftp.connect(FTP_HOST, FTP_PORT, timeout=10) self.ftp.login(FTP_USER, FTP_PASS) # Upload file to FTP server with open(TEST_FILENAME, 'rb') as f: self.ftp.storbinary(f"STOR {TEST_FILENAME}", f) def tearDown(self): try: # Remove test files from server if they exist for fname in [TEST_FILENAME, UPLOAD_FILENAME, RESUME_FILENAME]: try: self.ftp.delete(fname) except Exception: pass self.ftp.quit() except Exception: self.ftp.close() # Remove local temp files for fname in [TEST_FILENAME, DOWNLOAD_FILENAME, UPLOAD_FILENAME]: if os.path.exists(fname): os.remove(fname) def test_rnto_fails_after_cwd_change(self): dir1 = "dir_one" dir2 = "dir_two" filename = "file.txt" src_path = f"/{dir1}/{filename}" local_tmp = "/dev/null" # Replace if needed with ftplib.FTP() as ftp: ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) ftp.encoding = 'utf-8' # Create both directories for d in (dir1, dir2): try: ftp.mkd(d) except ftplib.error_perm: pass # Clean up any existing test files for d in (dir1, dir2): try: ftp.delete(f"/{d}/{filename}") except ftplib.error_perm: pass # Upload the file into dir1 ftp.cwd(f"/{dir1}") ftp.storbinary(f"STOR {filename}", open(local_tmp, "rb")) # Start rename process ftp.cwd("/") # ensure clean state rnfr_response = ftp.sendcmd(f"RNFR {src_path}") assert rnfr_response.startswith("350"), f"Expected 350 after RNFR, got: {rnfr_response}" # Change working directory before RNTO ftp.cwd(f"/{dir2}") # RNTO should now fail if relative path or context is invalid try: ftp.sendcmd(f"RNTO {filename}") # not a valid target now assert False, "RNTO should have failed but didn't" except ftplib.error_perm as e: assert "503" in str(e) or "450" in str(e), f"Expected RNTO failure error, got: {e}" # Cleanup ftp.cwd(f"/{dir1}") ftp.delete(filename) def test_stor_write(self): original_data = b'abcdefghij' with open(RESUME_FILENAME, 'wb') as f: f.write(original_data) with open(RESUME_FILENAME, 'rb') as f: self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) downloaded = BytesIO() self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) self.assertEqual(downloaded.getvalue(), original_data) def test_stor_resume_with_rest(self): original_data = b'abcdefghij' resume_data = b'XYZ' expected_data = b'abcdeXYZij' with open(RESUME_FILENAME, 'wb') as f: f.write(original_data) with open(RESUME_FILENAME, 'rb') as f: self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) self.ftp.sendcmd('REST 5') self.ftp.storbinary(f'STOR {RESUME_FILENAME}', BytesIO(resume_data)) downloaded = BytesIO() self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) self.assertEqual(downloaded.getvalue(), expected_data) def test_appe_append(self): initial_data = b'12345' append_data = b'67890' expected_data = b'1234567890' with open(RESUME_FILENAME, 'wb') as f: f.write(initial_data) with open(RESUME_FILENAME, 'rb') as f: self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) self.ftp.storbinary(f'APPE {RESUME_FILENAME}', BytesIO(append_data)) downloaded = BytesIO() time.sleep(0.1) self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) self.assertEqual(downloaded.getvalue(), expected_data) def test_220_service_ready(self): welcome = self.ftp.getwelcome() self.assertTrue(welcome.startswith('220'), f"Expected 220 response, got: {welcome}") def test_user_login_success(self): resp = self.ftp.login(user=FTP_USER, passwd=FTP_PASS) self.assertTrue(resp.startswith('230'), f"Login failed or unexpected response: {resp}") def test_user_login_fail(self): with self.assertRaises((error_perm, error_temp)): self.ftp.login(user='wronguser', passwd='wrongpass') def test_pwd_and_cwd(self): pwd = self.ftp.pwd() self.assertTrue(pwd.startswith('/'), f"PWD should return directory path, got: {pwd}") resp = self.ftp.cwd('/') self.assertTrue(resp.startswith('250'), f"CWD should succeed with 250 response, got: {resp}") def test_utf8_mkd(self): test_dirs = ["директория", "データ", "résumé"] with ftplib.FTP() as ftp: ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) ftp.encoding = 'utf-8' try: ftp.sendcmd("OPTS UTF8 ON") except Exception: pass for d in test_dirs: try: ftp.rmd(d) except ftplib.error_perm: pass response = ftp.mkd(d) # Try to extract directory name if 257 response m = re.match(r'257\s+"?(.*?)"?(\s|$)', response) if m: dir_in_response = m.group(1).strip('"') else: # Fallback: use the full response as the directory name dir_in_response = response.strip() assert d in dir_in_response or dir_in_response.endswith(d), \ f"Directory name mismatch: expected '{d}', got '{dir_in_response}'" ftp.rmd(d) def test_rnfr_rnto_same_dir(self): original_name = "old_name.txt" new_name = "new_name.txt" with ftplib.FTP() as ftp: ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) ftp.encoding = 'utf-8' # Enter a known working directory (e.g., root or home) ftp.cwd("/") wd = ftp.pwd() # Clean up before test for f in (original_name, new_name): try: ftp.delete(f) except ftplib.error_perm: pass # Upload a dummy file ftp.storbinary("STOR " + original_name, open("/dev/null", "rb")) # Run RNFR and RNTO from the same directory ftp.cwd(wd) # ensure working directory is consistent rnfr_response = ftp.sendcmd("RNFR " + original_name) assert rnfr_response.startswith("350"), f"Unexpected RNFR response: {rnfr_response}" rnto_response = ftp.sendcmd("RNTO " + new_name) assert rnto_response.startswith("250"), f"Unexpected RNTO response: {rnto_response}" # Verify rename files = ftp.nlst() assert new_name in files assert original_name not in files # Cleanup ftp.delete(new_name) def test_rnfr_rnto_across_dirs(self): dir1 = "folder_a" dir2 = "folder_b" filename = "testfile.txt" local_tmp = "/dev/null" # or create an empty temp file path_src = f"/{dir1}/{filename}" path_dst = f"/{dir2}/{filename}" with ftplib.FTP() as ftp: ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) ftp.encoding = 'utf-8' # Create both directories for d in (dir1, dir2): try: ftp.mkd(d) except ftplib.error_perm: pass # Cleanup possible leftovers for path in (path_src, path_dst): try: ftp.delete(path) except ftplib.error_perm: pass # Upload a dummy file to dir1 ftp.cwd(f"/{dir1}") ftp.storbinary("STOR " + filename, open(local_tmp, "rb")) # Rename from dir1 to dir2 rnfr_response = ftp.sendcmd(f"RNFR {path_src}") assert rnfr_response.startswith("350"), f"RNFR failed: {rnfr_response}" rnto_response = ftp.sendcmd(f"RNTO {path_dst}") assert rnto_response.startswith("250"), f"RNTO failed: {rnto_response}" # Confirm the file is in dir2 ftp.cwd(f"/{dir2}") files = ftp.nlst() assert filename in files, f"{filename} not found in {dir2}" # Confirm it's not in dir1 ftp.cwd(f"/{dir1}") files = ftp.nlst() assert filename not in files, f"{filename} still present in {dir1}" # Cleanup ftp.cwd(f"/{dir2}") ftp.delete(filename) def utf8_mkd(self, test_dirs): #test_dirs = [ # "Café", # "测试", # "директория", # "データ", # "résumé" #] with ftplib.FTP() as ftp: ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) ftp.encoding = 'utf-8' try: print("C: OPTS UTF8 ON") response = ftp.sendcmd("OPTS UTF8 ON") print(f"S: {repr(response)}") except Exception as e: print(f"Warning: OPTS UTF8 ON failed: {e}") for dir_name_utf8 in test_dirs: with self.subTest(dir=dir_name_utf8): try: # Delete if exists try: ftp.cwd(dir_name_utf8) ftp.cwd("..") ftp.rmd(dir_name_utf8) print(f"Deleted existing dir: {dir_name_utf8}") except ftplib.error_perm: pass # Create directory print(f"C: MKD {dir_name_utf8}") response = ftp.mkd(dir_name_utf8) print(f"S: {repr(response)}") self.assertTrue(response.startswith('257'), f"MKD failed: Server responded with: {repr(response)}") # Verify directory is accessible by changing to it ftp.cwd(dir_name_utf8) ftp.cwd("..") # Cleanup ftp.rmd(dir_name_utf8) print(f"Removed directory: {dir_name_utf8}") except Exception as e: self.fail(f"Exception for directory '{dir_name_utf8}': {e}") def test_pbsz_prot_violations(self): HOST = '127.0.0.1' PORT = 21 print("\n--- 12-15. Verifying Rules: PBSZ/PROT Command Sequences and Validation ---") # Scenario 1: Invoke PBSZ without prior security exchange (plaintext) print("\n[+] Scenario 1: Invoke PBSZ on plaintext connection") with socket.create_connection((HOST, PORT), timeout=5) as sock: banner = sock.recv(1024).decode() print(f"S: {banner.strip()}") sock.sendall(b'PBSZ 0\r\n') print("C: PBSZ 0") response = sock.recv(1024).decode().strip() print(f"S: {response}") self.assertNotEqual(response.startswith('200'), True, "Violation: PBSZ accepted without security.") # Scenario 2 & 3: Use TLS connection for further checks print("\n[+] Scenario 2: Invoke PBSZ with invalid parameters") sock = socket.create_connection((HOST, PORT), timeout=5) banner = sock.recv(1024).decode() print(f"S: {banner.strip()}") sock.sendall(b'AUTH TLS\r\n') auth_response = sock.recv(1024).decode().strip() print(f"C: AUTH TLS\nS: {auth_response}") context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE ssock = context.wrap_socket(sock, server_hostname=HOST) try: ssock.sendall(b'PBSZ not_a_number\r\n') print("C: PBSZ not_a_number") response = ssock.recv(1024).decode().strip() print(f"S: {response}") self.assertNotEqual(response.startswith('200'), True, "Violation: Server accepted invalid PBSZ parameter.") print("\n[+] Scenario 3: Invoke PROT on TLS connection without prior PBSZ") ssock.sendall(b'PROT P\r\n') print("C: PROT P") response_prot = ssock.recv(1024).decode().strip() print(f"S: {response_prot}") self.assertNotEqual(response_prot.startswith('200'), True, "Violation: Server accepted PROT without prior PBSZ.") finally: ssock.close() def test_feat_space_indent(self): """ Verify whether each feature in the FEAT response correctly starts with a single space. """ HOST = '127.0.0.1' PORT = 21 USER = 'username' PASS = 'password' print("\n--- 8. Verifying Rule: Missing Space Before FEAT Features ---") try: sock = socket.create_connection((HOST, PORT), timeout=10) welcome = sock.recv(1024).decode().strip() print(f"S: {welcome}") sock.sendall(f'USER {USER}\r\n'.encode()) resp = sock.recv(1024).decode().strip() print(f"C: USER {USER}\nS: {resp}") sock.sendall(f'PASS {PASS}\r\n'.encode()) resp = sock.recv(1024).decode().strip() print(f"C: PASS {PASS}\nS: {resp}") self.assertTrue(resp.startswith('230'), "Login failed") sock.sendall(b'FEAT\r\n') full_response = b'' while True: data = sock.recv(1024) if not data: break full_response += data if b'211 End.' in full_response: break response_text = full_response.decode('utf-8', errors='ignore') lines = response_text.split('\r\n') feature_lines = [] in_features = False for line in lines: if line.startswith('211-'): in_features = True continue elif line.startswith('211 '): break elif in_features and line.strip(): feature_lines.append(line) self.assertTrue(len(feature_lines) > 0, "No feature lines found in FEAT response") for line in feature_lines: self.assertTrue(line.startswith(' '), f"Feature line does not start with a space: '{line}'") except Exception as e: self.fail(f"Exception occurred during FEAT space indent verification: {e}") finally: if 'sock' in locals(): try: sock.close() except: pass def test_pasv_retr(self): self.ftp.login(FTP_USER, FTP_PASS) self.ftp.set_pasv(True) with open(DOWNLOAD_FILENAME, 'wb') as f: self.ftp.retrbinary(f'RETR {TEST_FILENAME}', f.write) self.assertTrue(os.path.exists(DOWNLOAD_FILENAME), "Downloaded file does not exist") # Check downloaded file size is > 0 self.assertGreater(os.path.getsize(DOWNLOAD_FILENAME), 0, "Downloaded file is empty") def test_active_retr(self): self.ftp.set_pasv(False) try: with open(DOWNLOAD_FILENAME, 'wb') as f: self.ftp.retrbinary(f'RETR {TEST_FILENAME}', f.write) except EOFError: self.skipTest("Active mode data connection failed") self.assertTrue(os.path.exists(DOWNLOAD_FILENAME), "Downloaded file does not exist") def test_rest_command(self): with open(TEST_FILENAME, 'rb') as f: f.seek(5) expected_data = f.read() self.ftp.sendcmd('REST 5') data = bytearray() self.ftp.retrbinary(f'RETR {TEST_FILENAME}', data.extend) self.assertEqual(data, expected_data, f"REST transfer mismatch.\nExpected: {expected_data!r}\nActual : {data!r}") def log_command(direction, command, response=None): """Log FTP commands and responses""" if direction == "C": print(f"C: {command}") elif direction == "S": print(f"S: {response}") elif direction == "ERROR": print(f"ERROR: {command}") elif direction == "INFO": print(f"INFO: {command}") def test_rest_stor_resume_explicit(self): """ Test resuming an upload using REST + STOR. This should append or overwrite starting from the specified offset. """ original_data = b'abcdefghij' # 10 bytes resume_data = b'XYZ' # Will overwrite at offset 5 → expect: abcdeXYZij expected_data = b'abcdeXYZij' # Step 1: Upload original data with open(RESUME_FILENAME, 'wb') as f: f.write(original_data) with open(RESUME_FILENAME, 'rb') as f: self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) # Step 2: Resume upload from offset 5 with 'XYZ' self.ftp.sendcmd('REST 5') self.ftp.storbinary(f'STOR {RESUME_FILENAME}', BytesIO(resume_data)) # Step 3: Download and verify final file content downloaded = BytesIO() self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) result = downloaded.getvalue() self.assertEqual(result, expected_data, f"REST+STOR resume failed.\nExpected: {expected_data}\nActual : {result}") def test_rest_violations(self): FILE = "test_rest.txt" CONTENT = b"1234567890" print("\n--- Verifying Rules: REST Command Sequence and Implementation ---") print("Test Objective: Verify that the REST command correctly limits subsequent commands and handles offsets") try: with ftplib.FTP() as ftp: ftp.set_debuglevel(0) ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) self.log_command("INFO", "Login successful") self.log_command("C", "TYPE I") resp = ftp.sendcmd("TYPE I") self.log_command("S", resp) self.log_command("C", f"STOR {FILE}") ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT)) self.log_command("INFO", f"File uploaded successfully, content: {CONTENT} (length: {len(CONTENT)})") print("\n" + "="*60) print("[+] Scenario 1: Verify if RETR correctly handles the offset set by REST") print(" RFC Requirement: RETR should start transferring from the byte position specified by REST") rest_offset = 5 self.log_command("C", f"REST {rest_offset}") response_rest2 = ftp.sendcmd(f"REST {rest_offset}") self.log_command("S", response_rest2) self.assertTrue(response_rest2.startswith('350'), f"REST {rest_offset} failed: {response_rest2}") self.log_command("C", f"RETR {FILE}") out = BytesIO() ftp.retrbinary(f"RETR {FILE}", out.write) retrieved_content = out.getvalue() print(f"\n Transfer Result Analysis:") print(f" Original File Content: {CONTENT} (length: {len(CONTENT)})") print(f" REST Offset: {rest_offset} (starting from byte {rest_offset})") print(f" Expected Transferred Content: {CONTENT[rest_offset:]} (length: {len(CONTENT[rest_offset:])})") print(f" Actual Transferred Content: {retrieved_content} (length: {len(retrieved_content)})") # Assertion based on REST behavior if len(retrieved_content) == len(CONTENT): self.fail("RETR ignored REST offset: entire file downloaded.") elif len(retrieved_content) == len(CONTENT) - rest_offset and retrieved_content == CONTENT[rest_offset:]: print("No violation detected. RETR correctly handled the REST offset.") else: self.fail("REST offset handled incorrectly or data mismatch.") try: self.log_command("C", f"DELE {FILE}") # ftp.delete(FILE) # Uncomment if delete supported by server self.log_command("INFO", "Test file cleanup completed") except Exception: self.log_command("ERROR", "Failed to cleanup test file") except Exception as e: self.fail(f"Error during REST violation test: {e}") def test_rest_stor_resume(self): with open(UPLOAD_FILENAME, 'wb') as f: f.write(TEST_CONTENT) with open(UPLOAD_FILENAME, 'rb') as f: self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) self.ftp.sendcmd('REST 5') with open(UPLOAD_FILENAME, 'rb') as f: f.seek(5) resp = self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) self.assertTrue(resp.startswith('226') or resp.startswith('250'), f"STOR after REST failed: {resp}") def test_rest_cleared_after_retr(self): self.ftp.sendcmd('REST 5') self.ftp.retrbinary(f'RETR {TEST_FILENAME}', lambda _: None) # discard output # Next RETR should start from 0 data = bytearray() self.ftp.retrbinary(f'RETR {TEST_FILENAME}', data.extend) with open(TEST_FILENAME, 'rb') as f: expected = f.read() self.assertEqual(data, expected, "REST offset wasn't cleared after first RETR") def test_stor_upload(self): with open(UPLOAD_FILENAME, 'wb') as f: f.write(b'This is test upload content.') with open(UPLOAD_FILENAME, 'rb') as f: resp = self.ftp.storbinary(f'STOR {UPLOAD_FILENAME}', f) self.assertTrue(resp.startswith('226') or resp.startswith('250'), f"STOR command should succeed, got: {resp}") def test_ccc_without_prereq(self): """Verify that the server rejects CCC command sent without an active TLS session.""" try: with FTP() as ftp: ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) # Send CCC command in plaintext session response = ftp.voidcmd('CCC') # If the server responds with 200, that is a violation self.assertFalse(response.startswith('200'), f"Server incorrectly accepted CCC command without TLS: {response}") except error_perm as e: # Expected behavior: server rejects CCC in plaintext with 5xx error self.assertTrue(str(e).startswith('5'), f"Unexpected permission error response: {e}") except Exception as e: self.fail(f"Unexpected exception during CCC command test: {e}") def test_ccc_unprotected_reply(self): """ Verify if the server returns the correct 533 response code when CCC is sent over an unprotected connection. """ try: with FTP() as ftp: ftp.connect(FTP_HOST, FTP_PORT, timeout=5) ftp.login(FTP_USER, FTP_PASS) response = ftp.voidcmd('CCC') except error_perm as e: response = str(e) except Exception as e: self.fail(f"Unexpected exception during CCC command test: {e}") # The RFC requires the server to respond with 533 if CCC is disallowed in the current context if '533' in response: pass # Correct behavior elif response.startswith('200') or response.startswith('502'): self.fail(f"Violation: Server responded with '{response.split()[0]}' instead of '533'.") else: self.fail(f"Unexpected server response: {response}") def test_rest_offset_compliance(self): """ Verify that RETR respects REST offset and doesn't transfer the whole file. """ self.ftp.login(FTP_USER, FTP_PASS) self.ftp.sendcmd("TYPE I") FILE = "test_rest.txt" CONTENT = b"1234567890" rest_offset = 5 # Upload known content self.ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT)) # Set REST offset resp = self.ftp.sendcmd(f"REST {rest_offset}") self.assertTrue(resp.startswith('350'), f"Expected 350 response to REST, got: {resp}") # RETR after REST out = BytesIO() self.ftp.retrbinary(f"RETR {FILE}", out.write) retrieved_content = out.getvalue() expected = CONTENT[rest_offset:] # Validate self.assertEqual(retrieved_content, expected, f"REST offset not respected.\nExpected: {expected!r}\nActual : {retrieved_content!r}") # Clean up (optional, since tearDown removes test files) try: self.ftp.delete(FILE) except: pass def test_auth_reauth_violation(self): """ Verify whether the server enforces re-authentication after re-issuing the AUTH command. """ HOST = FTP_HOST PORT = FTP_PORT USER = FTP_USER PASS = FTP_PASS print("\n--- Verifying Rule: Re-authentication Required After Re-issuing AUTH Command ---") sock = None ssock = None try: # Step 1: Connect and log in in plaintext sock = socket.create_connection((HOST, PORT), timeout=5) welcome = sock.recv(1024).decode().strip() print(f"S: {welcome}") sock.sendall(f'USER {USER}\r\n'.encode()) print(f"C: USER {USER}") response_user = sock.recv(1024).decode().strip() print(f"S: {response_user}") sock.sendall(f'PASS {PASS}\r\n'.encode()) print(f"C: PASS {'*' * len(PASS)}") response_login = sock.recv(1024).decode().strip() print(f"S: {response_login}") self.assertTrue(response_login.startswith('230'), "Login failed; test cannot continue.") # Step 2: Verify logged-in status sock.sendall(b'PWD\r\n') print("C: PWD") response_pwd1 = sock.recv(1024).decode().strip() print(f"S: {response_pwd1}") self.assertTrue(response_pwd1.startswith('257'), "PWD command failed; cannot confirm login status.") # Step 3: Re-issue AUTH TLS command to reset security state sock.sendall(b'AUTH TLS\r\n') print("C: AUTH TLS") response_auth = sock.recv(1024).decode().strip() print(f"S: {response_auth}") self.assertTrue(response_auth.startswith('234'), f"AUTH TLS command failed: {response_auth}") # Wrap socket in SSL context (skip certificate verification) context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE ssock = context.wrap_socket(sock, server_hostname=HOST) print(" -> TLS handshake successful; connection is encrypted.") # Step 4: Attempt authenticated command again without re-authenticating ssock.sendall(b'PWD\r\n') print("C: PWD") response_pwd2 = ssock.recv(1024).decode().strip() print(f"S: {response_pwd2}") # Analyze result if response_pwd2.startswith('257'): self.fail("VIOLATION: Server accepted PWD command after AUTH TLS without re-authentication!") elif response_pwd2.startswith('530'): # Correct behavior: require login after re-auth pass else: self.fail(f"Unexpected server response after AUTH TLS: {response_pwd2}") except Exception as e: self.fail(f"Exception occurred during AUTH re-auth test: {e}") finally: if ssock: ssock.close() elif sock: sock.close() def test_quit(self): resp = self.ftp.quit() self.assertTrue(resp.startswith('221'), f"QUIT should respond with 221, got: {resp}") def test_invalid_command(self): try: resp = self.ftp.sendcmd('FOOBAR') self.assertTrue(resp.startswith('500') or resp.startswith('502'), f"Invalid command should respond with 5xx error, got: {resp}") except error_perm as e: self.assertTrue(str(e).startswith('500') or str(e).startswith('502'), f"Invalid command error should start with 5xx, got: {e}") if __name__ == '__main__': unittest.main()