diff --git a/dataChannel/dataChannel.c b/dataChannel/dataChannel.c index c8cd5c3..ba9ef11 100644 --- a/dataChannel/dataChannel.c +++ b/dataChannel/dataChannel.c @@ -223,7 +223,6 @@ static int processStorAppe(cleanUpWorkerArgs *args) return 1; } - static int acceptConnection(cleanUpWorkerArgs *args) { ftpDataType *ftpData = args->ftpData; diff --git a/test/integration.py b/test/integration.py index 12a0640..14015f4 100644 --- a/test/integration.py +++ b/test/integration.py @@ -8,6 +8,7 @@ import socket import ftplib import re + FTP_HOST = '127.0.0.1' FTP_PORT = 21 FTP_USER = 'username' @@ -54,6 +55,57 @@ class FTPServerRFCComplianceTests(unittest.TestCase): if os.path.exists(fname): os.remove(fname) + def test_rnto_fails_after_cwd_change(self): + dir1 = "dir_one" + dir2 = "dir_two" + filename = "file.txt" + src_path = f"/{dir1}/{filename}" + local_tmp = "/dev/null" # Replace if needed + + with ftplib.FTP() as ftp: + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + ftp.login(FTP_USER, FTP_PASS) + ftp.encoding = 'utf-8' + + # Create both directories + for d in (dir1, dir2): + try: + ftp.mkd(d) + except ftplib.error_perm: + pass + + # Clean up any existing test files + for d in (dir1, dir2): + try: + ftp.delete(f"/{d}/{filename}") + except ftplib.error_perm: + pass + + # Upload the file into dir1 + ftp.cwd(f"/{dir1}") + ftp.storbinary(f"STOR {filename}", open(local_tmp, "rb")) + + # Start rename process + ftp.cwd("/") # ensure clean state + rnfr_response = ftp.sendcmd(f"RNFR {src_path}") + assert rnfr_response.startswith("350"), f"Expected 350 after RNFR, got: {rnfr_response}" + + # Change working directory before RNTO + ftp.cwd(f"/{dir2}") + + # RNTO should now fail if relative path or context is invalid + try: + ftp.sendcmd(f"RNTO {filename}") # not a valid target now + assert False, "RNTO should have failed but didn't" + except ftplib.error_perm as e: + assert "503" in str(e) or "450" in str(e), f"Expected RNTO failure error, got: {e}" + + # Cleanup + ftp.cwd(f"/{dir1}") + ftp.delete(filename) + + + def test_stor_write(self): original_data = b'abcdefghij' with open(RESUME_FILENAME, 'wb') as f: @@ -89,12 +141,16 @@ class FTPServerRFCComplianceTests(unittest.TestCase): with open(RESUME_FILENAME, 'wb') as f: f.write(initial_data) + with open(RESUME_FILENAME, 'rb') as f: self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) self.ftp.storbinary(f'APPE {RESUME_FILENAME}', BytesIO(append_data)) downloaded = BytesIO() + + time.sleep(0.1) + self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) self.assertEqual(downloaded.getvalue(), expected_data) @@ -117,18 +173,7 @@ class FTPServerRFCComplianceTests(unittest.TestCase): resp = self.ftp.cwd('/') self.assertTrue(resp.startswith('250'), f"CWD should succeed with 250 response, got: {resp}") - - def Disabledtest_mkdr(self): - test_dirs = [ - "dir1", - "dir2", - "dir3" - ] - self.utf8_mkd(test_dirs) - - - def test_utf8_mkd(self): test_dirs = ["директория", "データ", "résumé"] @@ -137,14 +182,12 @@ class FTPServerRFCComplianceTests(unittest.TestCase): ftp.login(FTP_USER, FTP_PASS) ftp.encoding = 'utf-8' - # Enable UTF-8 option try: ftp.sendcmd("OPTS UTF8 ON") except Exception: pass for d in test_dirs: - # Try to remove if exists try: ftp.rmd(d) except ftplib.error_perm: @@ -152,15 +195,110 @@ class FTPServerRFCComplianceTests(unittest.TestCase): response = ftp.mkd(d) - # Match response code and directory name more flexibly - m = re.match(r'257\s+"?(.+?)"?\s', response) - assert m is not None, f"MKD response format incorrect: {response}" - dir_in_response = m.group(1) - assert dir_in_response == d, f"Directory name mismatch: expected {d}, got {dir_in_response}" + # Try to extract directory name if 257 response + m = re.match(r'257\s+"?(.*?)"?(\s|$)', response) + + if m: + dir_in_response = m.group(1).strip('"') + else: + # Fallback: use the full response as the directory name + dir_in_response = response.strip() + + assert d in dir_in_response or dir_in_response.endswith(d), \ + f"Directory name mismatch: expected '{d}', got '{dir_in_response}'" - # Cleanup ftp.rmd(d) + def test_rnfr_rnto_same_dir(self): + original_name = "old_name.txt" + new_name = "new_name.txt" + + with ftplib.FTP() as ftp: + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + ftp.login(FTP_USER, FTP_PASS) + ftp.encoding = 'utf-8' + + # Enter a known working directory (e.g., root or home) + ftp.cwd("/") + wd = ftp.pwd() + + # Clean up before test + for f in (original_name, new_name): + try: + ftp.delete(f) + except ftplib.error_perm: + pass + + # Upload a dummy file + ftp.storbinary("STOR " + original_name, open("/dev/null", "rb")) + + # Run RNFR and RNTO from the same directory + ftp.cwd(wd) # ensure working directory is consistent + rnfr_response = ftp.sendcmd("RNFR " + original_name) + assert rnfr_response.startswith("350"), f"Unexpected RNFR response: {rnfr_response}" + + rnto_response = ftp.sendcmd("RNTO " + new_name) + assert rnto_response.startswith("250"), f"Unexpected RNTO response: {rnto_response}" + + # Verify rename + files = ftp.nlst() + assert new_name in files + assert original_name not in files + + # Cleanup + ftp.delete(new_name) + + def test_rnfr_rnto_across_dirs(self): + dir1 = "folder_a" + dir2 = "folder_b" + filename = "testfile.txt" + local_tmp = "/dev/null" # or create an empty temp file + path_src = f"/{dir1}/{filename}" + path_dst = f"/{dir2}/{filename}" + + with ftplib.FTP() as ftp: + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + ftp.login(FTP_USER, FTP_PASS) + ftp.encoding = 'utf-8' + + # Create both directories + for d in (dir1, dir2): + try: + ftp.mkd(d) + except ftplib.error_perm: + pass + + # Cleanup possible leftovers + for path in (path_src, path_dst): + try: + ftp.delete(path) + except ftplib.error_perm: + pass + + # Upload a dummy file to dir1 + ftp.cwd(f"/{dir1}") + ftp.storbinary("STOR " + filename, open(local_tmp, "rb")) + + # Rename from dir1 to dir2 + rnfr_response = ftp.sendcmd(f"RNFR {path_src}") + assert rnfr_response.startswith("350"), f"RNFR failed: {rnfr_response}" + + rnto_response = ftp.sendcmd(f"RNTO {path_dst}") + assert rnto_response.startswith("250"), f"RNTO failed: {rnto_response}" + + # Confirm the file is in dir2 + ftp.cwd(f"/{dir2}") + files = ftp.nlst() + assert filename in files, f"{filename} not found in {dir2}" + + # Confirm it's not in dir1 + ftp.cwd(f"/{dir1}") + files = ftp.nlst() + assert filename not in files, f"{filename} still present in {dir1}" + + # Cleanup + ftp.cwd(f"/{dir2}") + ftp.delete(filename) def utf8_mkd(self, test_dirs): #test_dirs = [ @@ -214,7 +352,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): except Exception as e: self.fail(f"Exception for directory '{dir_name_utf8}': {e}") - def test_pbsz_prot_violations(self): HOST = '127.0.0.1' PORT = 21 @@ -264,8 +401,7 @@ class FTPServerRFCComplianceTests(unittest.TestCase): "Violation: Server accepted PROT without prior PBSZ.") finally: ssock.close() - - + def test_feat_space_indent(self): """ Verify whether each feature in the FEAT response correctly starts with a single space. @@ -334,7 +470,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): except: pass - def test_pasv_retr(self): self.ftp.login(FTP_USER, FTP_PASS) self.ftp.set_pasv(True) @@ -346,8 +481,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): # Check downloaded file size is > 0 self.assertGreater(os.path.getsize(DOWNLOAD_FILENAME), 0, "Downloaded file is empty") - - def test_active_retr(self): self.ftp.set_pasv(False) try: @@ -405,7 +538,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): self.assertEqual(result, expected_data, f"REST+STOR resume failed.\nExpected: {expected_data}\nActual : {result}") - def test_rest_violations(self): FILE = "test_rest.txt" CONTENT = b"1234567890" @@ -470,7 +602,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): except Exception as e: self.fail(f"Error during REST violation test: {e}") - def test_rest_stor_resume(self): with open(UPLOAD_FILENAME, 'wb') as f: f.write(TEST_CONTENT) @@ -503,8 +634,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): self.assertTrue(resp.startswith('226') or resp.startswith('250'), f"STOR command should succeed, got: {resp}") - - def test_ccc_without_prereq(self): """Verify that the server rejects CCC command sent without an active TLS session.""" try: @@ -587,7 +716,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): except: pass - def test_auth_reauth_violation(self): """ Verify whether the server enforces re-authentication after re-issuing the AUTH command. @@ -667,7 +795,6 @@ class FTPServerRFCComplianceTests(unittest.TestCase): elif sock: sock.close() - def test_quit(self): resp = self.ftp.quit() self.assertTrue(resp.startswith('221'), f"QUIT should respond with 221, got: {resp}") @@ -681,6 +808,5 @@ class FTPServerRFCComplianceTests(unittest.TestCase): self.assertTrue(str(e).startswith('500') or str(e).startswith('502'), f"Invalid command error should start with 5xx, got: {e}") - if __name__ == '__main__': unittest.main()