From c4ab90b358e5667e2423e03ec6ecc9a1596f3ab4 Mon Sep 17 00:00:00 2001 From: Ugo Date: Sun, 20 Jul 2025 15:12:22 +0100 Subject: [PATCH] fix: rest command support + integration tests --- controlChannel/controlChannel.c | 24 ++ dataChannel/dataChannel.c | 127 ++++--- ftpCommandElaborate.c | 32 +- ftpData.c | 2 +- test/integration.py | 644 ++++++++++++++++++++++++++++++++ test/resume_test.txt | 1 + test/test1.py | 45 +++ test/test2.py | 48 +++ test/test3.py | 99 +++++ test/test4.py | 116 ++++++ test/test5.py | 48 +++ test/test6.py | 39 ++ test/test7.py | 81 ++++ test/test8.py | 139 +++++++ 14 files changed, 1362 insertions(+), 83 deletions(-) create mode 100644 test/integration.py create mode 100644 test/resume_test.txt create mode 100644 test/test1.py create mode 100644 test/test2.py create mode 100644 test/test3.py create mode 100644 test/test4.py create mode 100644 test/test5.py create mode 100644 test/test6.py create mode 100644 test/test7.py create mode 100644 test/test8.py diff --git a/controlChannel/controlChannel.c b/controlChannel/controlChannel.c index 8f696d9..67ce90f 100644 --- a/controlChannel/controlChannel.c +++ b/controlChannel/controlChannel.c @@ -55,6 +55,7 @@ /* Private function definition */ static int processCommand(int processingElement, ftpDataType *ftpData); static void memoryDebug(ftpDataType *ftpData); +static int isTransferCommand(int processingElement, ftpDataType *ftpData); void evaluateControlChannel(ftpDataType *ftpData) { @@ -235,6 +236,22 @@ static void memoryDebug(ftpDataType *ftpData) } } +static int isTransferCommand(int processingElement, ftpDataType *ftpData) +{ + if (IS_CMD(ftpData->clients[processingElement].theCommandReceived, "RETR") || + IS_CMD(ftpData->clients[processingElement].theCommandReceived, "STOR") || + IS_CMD(ftpData->clients[processingElement].theCommandReceived, "PASV") || + IS_CMD(ftpData->clients[processingElement].theCommandReceived, "TYPE I") || + IS_CMD(ftpData->clients[processingElement].theCommandReceived, "TYPE A") || + IS_CMD(ftpData->clients[processingElement].theCommandReceived, "TYPE F") || + IS_CMD(ftpData->clients[processingElement].theCommandReceived, "APPE")) + { + return 1; + } + + return 0; +} + static int processCommand(int processingElement, ftpDataType *ftpData) { //command handler structure @@ -291,6 +308,13 @@ static int processCommand(int processingElement, ftpDataType *ftpData) //printTimeStamp(); my_printf("\nCommand received from (%d): %s", processingElement, ftpData->clients[processingElement].theCommandReceived); + if(!isTransferCommand(processingElement, ftpData) && + ftpData->clients[processingElement].workerData.retrRestartAtByte != 0) + { + my_printf("Reset: retrRestartAtByte"); + ftpData->clients[processingElement].workerData.retrRestartAtByte = 0; + } + cleanDynamicStringDataType(&ftpData->clients[processingElement].ftpCommand.commandArgs, 0, ftpData->clients[processingElement].memoryTable); cleanDynamicStringDataType(&ftpData->clients[processingElement].ftpCommand.commandOps, 0, ftpData->clients[processingElement].memoryTable); diff --git a/dataChannel/dataChannel.c b/dataChannel/dataChannel.c index efbfd1a..c8cd5c3 100644 --- a/dataChannel/dataChannel.c +++ b/dataChannel/dataChannel.c @@ -133,92 +133,88 @@ void workerCleanup(cleanUpWorkerArgs *args) static int processStorAppe(cleanUpWorkerArgs *args) { ftpDataType *ftpData = args->ftpData; - int theSocketId = args->socketId; - int returnCode = 0; + int theSocketId = args->socketId; + int returnCode = 0; + off_t restartPos = ftpData->clients[theSocketId].workerData.retrRestartAtByte; + FILE *file = NULL; - if (compareStringCaseInsensitive(ftpData->clients[theSocketId].workerData.theCommandReceived, "APPE", strlen("APPE")) == 1) - { - #ifdef LARGE_FILE_SUPPORT_ENABLED - //#warning LARGE FILE SUPPORT IS ENABLED! - ftpData->clients[theSocketId].workerData.theStorFile = fopen64(ftpData->clients[theSocketId].fileToStor.text, "ab"); - #endif + const char *filePath = ftpData->clients[theSocketId].fileToStor.text; + const char *command = ftpData->clients[theSocketId].workerData.theCommandReceived; - #ifndef LARGE_FILE_SUPPORT_ENABLED - #warning LARGE FILE SUPPORT IS NOT ENABLED! - ftpData->clients[theSocketId].workerData.theStorFile = fopen(ftpData->clients[theSocketId].fileToStor.text, "ab"); - #endif - } - else - { - #ifdef LARGE_FILE_SUPPORT_ENABLED - //#warning LARGE FILE SUPPORT IS ENABLED! - ftpData->clients[theSocketId].workerData.theStorFile = fopen64(ftpData->clients[theSocketId].fileToStor.text, "wb"); - #endif + int isAppe = compareStringCaseInsensitive(command, "APPE", strlen("APPE")) == 1; - #ifndef LARGE_FILE_SUPPORT_ENABLED - #warning LARGE FILE SUPPORT IS NOT ENABLED! - ftpData->clients[theSocketId].workerData.theStorFile = fopen(ftpData->clients[theSocketId].fileToStor.text, "wb"); - #endif - } + #ifdef LARGE_FILE_SUPPORT_ENABLED + if (isAppe) { + file = fopen64(filePath, "ab"); + } else if (restartPos > 0) { + file = fopen64(filePath, "r+b"); + } else { + file = fopen64(filePath, "wb"); + } + #else + if (isAppe) { + file = fopen(filePath, "ab"); + } else if (restartPos > 0) { + file = fopen(filePath, "r+b"); + } else { + file = fopen(filePath, "wb"); + } + #endif - if (ftpData->clients[theSocketId].workerData.theStorFile == NULL) - { + ftpData->clients[theSocketId].workerData.theStorFile = file; + + if (file == NULL) { returnCode = socketPrintf(ftpData, theSocketId, "s", "553 Unable to write the file\r\n"); - - if (returnCode <= 0) - { + if (returnCode <= 0) { ftpData->clients[theSocketId].closeTheClient = 1; - LOG_ERROR("socketPrintf"); + LOG_ERROR("socketPrintf"); my_printf("\n Closing the client 6"); return -1; } - return -1; } - while(1) - { - if (ftpData->clients[theSocketId].dataChannelIsTls != 1) - { - ftpData->clients[theSocketId].workerData.bufferIndex = read(ftpData->clients[theSocketId].workerData.socketConnection, ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE); - } - else if (ftpData->clients[theSocketId].dataChannelIsTls == 1) - { - #ifdef OPENSSL_ENABLED - if (ftpData->clients[theSocketId].workerData.passiveModeOn == 1) - ftpData->clients[theSocketId].workerData.bufferIndex = SSL_read(ftpData->clients[theSocketId].workerData.serverSsl, ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE); - else if(ftpData->clients[theSocketId].workerData.activeModeOn == 1) - ftpData->clients[theSocketId].workerData.bufferIndex = SSL_read(ftpData->clients[theSocketId].workerData.clientSsl, ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE); - #endif - } - else - { - my_printf("\nError state"); - } + if (!isAppe && restartPos > 0) { + fseeko(file, restartPos, SEEK_SET); + ftpData->clients[theSocketId].workerData.retrRestartAtByte = 0; + } - if (ftpData->clients[theSocketId].workerData.bufferIndex == 0) - { - break; + while (1) { + int bytesRead = 0; + + if (ftpData->clients[theSocketId].dataChannelIsTls != 1) { + bytesRead = read(ftpData->clients[theSocketId].workerData.socketConnection, + ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE); } - else if (ftpData->clients[theSocketId].workerData.bufferIndex > 0) - { - fwrite(ftpData->clients[theSocketId].workerData.buffer, ftpData->clients[theSocketId].workerData.bufferIndex, 1, ftpData->clients[theSocketId].workerData.theStorFile); + #ifdef OPENSSL_ENABLED + else { + if (ftpData->clients[theSocketId].workerData.passiveModeOn == 1) { + bytesRead = SSL_read(ftpData->clients[theSocketId].workerData.serverSsl, + ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE); + } else if (ftpData->clients[theSocketId].workerData.activeModeOn == 1) { + bytesRead = SSL_read(ftpData->clients[theSocketId].workerData.clientSsl, + ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE); + } + } + #endif + + if (bytesRead == 0) { + break; + } else if (bytesRead > 0) { + fwrite(ftpData->clients[theSocketId].workerData.buffer, bytesRead, 1, file); usleep(100); ftpData->clients[theSocketId].lastActivityTimeStamp = (int)time(NULL); - } - else if (ftpData->clients[theSocketId].workerData.bufferIndex < 0) - { + } else { break; } } - int theReturnCode; - theReturnCode = fclose(ftpData->clients[theSocketId].workerData.theStorFile); + fclose(file); ftpData->clients[theSocketId].workerData.theStorFile = NULL; - if (ftpData->clients[theSocketId].login.ownerShip.ownerShipSet == 1) - { - FILE_doChownFromUidGid(ftpData->clients[theSocketId].fileToStor.text, ftpData->clients[theSocketId].login.ownerShip.uid, ftpData->clients[theSocketId].login.ownerShip.gid); + if (ftpData->clients[theSocketId].login.ownerShip.ownerShipSet == 1) { + FILE_doChownFromUidGid(filePath, ftpData->clients[theSocketId].login.ownerShip.uid, + ftpData->clients[theSocketId].login.ownerShip.gid); } ftpData->clients[theSocketId].workerData.commandProcessed = 1; @@ -227,6 +223,7 @@ static int processStorAppe(cleanUpWorkerArgs *args) return 1; } + static int acceptConnection(cleanUpWorkerArgs *args) { ftpDataType *ftpData = args->ftpData; @@ -387,6 +384,8 @@ static int processRetr(cleanUpWorkerArgs *args) int returnCode = 0; long long int writenSize = 0, writeReturn = 0; + my_printf("\n ftpData->clients[theSocketId].workerData.retrRestartAtByte = %d", ftpData->clients[theSocketId].workerData.retrRestartAtByte); + writenSize = writeRetrFile(ftpData, theSocketId, ftpData->clients[theSocketId].workerData.retrRestartAtByte, ftpData->clients[theSocketId].workerData.theStorFile); ftpData->clients[theSocketId].workerData.retrRestartAtByte = 0; diff --git a/ftpCommandElaborate.c b/ftpCommandElaborate.c index 5cce992..b4f968d 100755 --- a/ftpCommandElaborate.c +++ b/ftpCommandElaborate.c @@ -2239,17 +2239,13 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st { long long int readen = 0; long long int toReturn = 0, writtenSize = 0; - long long int currentPosition = 0; long long int theFileSize; char buffer[FTP_COMMAND_ELABORATE_CHAR_BUFFER]; + memset(buffer, 0, FTP_COMMAND_ELABORATE_CHAR_BUFFER); #ifdef LARGE_FILE_SUPPORT_ENABLED - // #warning LARGE FILE SUPPORT IS ENABLED! retrFP = fopen64(data->clients[theSocketId].fileToRetr.text, "rb"); -#endif - -#ifndef LARGE_FILE_SUPPORT_ENABLED -#warning LARGE FILE SUPPORT IS NOT ENABLED! +#else retrFP = fopen(data->clients[theSocketId].fileToRetr.text, "rb"); #endif @@ -2259,29 +2255,29 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st } theFileSize = FILE_GetFileSize(retrFP); + my_printf("\ntheFileSize %lld", theFileSize); if (startFrom > 0) { + my_printf("\nCursor moved %lld", startFrom); + #ifdef LARGE_FILE_SUPPORT_ENABLED - // #warning LARGE FILE SUPPORT IS ENABLED! - currentPosition = (long long int)lseek64(fileno(retrFP), startFrom, SEEK_SET); + if (fseeko64(retrFP, startFrom, SEEK_SET) != 0) +#else + if (fseeko(retrFP, startFrom, SEEK_SET) != 0) #endif - -#ifndef LARGE_FILE_SUPPORT_ENABLED -#warning LARGE FILE SUPPORT IS NOT ENABLED! - currentPosition = (long long int)lseek(fileno(retrFP), startFrom, SEEK_SET); -#endif - - if (currentPosition == -1) { fclose(retrFP); retrFP = NULL; return toReturn; } + + my_printf("\ncurrentPosition %lld", startFrom); } while ((readen = (long long int)fread(buffer, sizeof(char), FTP_COMMAND_ELABORATE_CHAR_BUFFER, retrFP)) > 0) { + my_printf("\nTRANSFER read %lld bytes: %.*s", readen, (int)readen, buffer); if (data->clients[theSocketId].dataChannelIsTls != 1) { @@ -2289,7 +2285,6 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st } else { - #ifdef OPENSSL_ENABLED if (data->clients[theSocketId].workerData.passiveModeOn == 1) writtenSize = SSL_write(data->clients[theSocketId].workerData.serverSsl, buffer, readen); @@ -2300,7 +2295,6 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st if (writtenSize <= 0) { - my_printf("\nError %lld while writing retr file.", writtenSize); fclose(retrFP); retrFP = NULL; @@ -2308,7 +2302,7 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st } else { - toReturn = toReturn + writtenSize; + toReturn += writtenSize; data->clients[theSocketId].lastActivityTimeStamp = (int)time(NULL); } } @@ -2317,6 +2311,8 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st return toReturn; } + + char *getFtpCommandArg(char *theCommand, char *theCommandString, int skipArgs) { char *toReturn = theCommandString + strlen(theCommand); diff --git a/ftpData.c b/ftpData.c index 41adcdf..cbc40e3 100755 --- a/ftpData.c +++ b/ftpData.c @@ -702,7 +702,7 @@ void resetWorkerData(ftpDataType *data, int clientId, int isInitialization) data->clients[clientId].workerData.socketConnection = 0; data->clients[clientId].workerData.bufferIndex = 0; data->clients[clientId].workerData.commandReceived = 0; - data->clients[clientId].workerData.retrRestartAtByte = 0; + // data->clients[clientId].workerData.retrRestartAtByte = 0; data->clients[clientId].workerData.threadIsAlive = 0; data->clients[clientId].workerData.activeModeOn = 0; data->clients[clientId].workerData.extendedPassiveModeOn = 0; diff --git a/test/integration.py b/test/integration.py new file mode 100644 index 0000000..955a367 --- /dev/null +++ b/test/integration.py @@ -0,0 +1,644 @@ +import unittest +from ftplib import FTP, error_perm, error_temp +import os +from io import BytesIO +import time +import ssl +import socket +import ftplib + +FTP_HOST = '127.0.0.1' +FTP_PORT = 21 +FTP_USER = 'username' +FTP_PASS = 'password' + +TEST_FILENAME = 'test.txt' +TEST_CONTENT = b'1234567890' # 10 bytes known content +DOWNLOAD_FILENAME = 'downloaded_test.txt' +UPLOAD_FILENAME = 'upload_test.txt' +RESUME_FILENAME = 'resume_test.txt' + + +class FTPServerRFCComplianceTests(unittest.TestCase): + + def setUp(self): + # Create local test file first + with open(TEST_FILENAME, 'wb') as f: + f.write(TEST_CONTENT) + + # Connect and login + self.ftp = FTP() + self.ftp.connect(FTP_HOST, FTP_PORT, timeout=10) + self.ftp.login(FTP_USER, FTP_PASS) + + # Upload file to FTP server + with open(TEST_FILENAME, 'rb') as f: + self.ftp.storbinary(f"STOR {TEST_FILENAME}", f) + + + def tearDown(self): + try: + # Remove test files from server if they exist + for fname in [TEST_FILENAME, UPLOAD_FILENAME, RESUME_FILENAME]: + try: + self.ftp.delete(fname) + except Exception: + pass + self.ftp.quit() + except Exception: + self.ftp.close() + + # Remove local temp files + for fname in [TEST_FILENAME, DOWNLOAD_FILENAME, UPLOAD_FILENAME]: + if os.path.exists(fname): + os.remove(fname) + + def test_stor_write(self): + original_data = b'abcdefghij' + with open(RESUME_FILENAME, 'wb') as f: + f.write(original_data) + with open(RESUME_FILENAME, 'rb') as f: + self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) + + downloaded = BytesIO() + self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) + self.assertEqual(downloaded.getvalue(), original_data) + + def test_stor_resume_with_rest(self): + original_data = b'abcdefghij' + resume_data = b'XYZ' + expected_data = b'abcdeXYZij' + + with open(RESUME_FILENAME, 'wb') as f: + f.write(original_data) + with open(RESUME_FILENAME, 'rb') as f: + self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) + + self.ftp.sendcmd('REST 5') + self.ftp.storbinary(f'STOR {RESUME_FILENAME}', BytesIO(resume_data)) + + downloaded = BytesIO() + self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) + self.assertEqual(downloaded.getvalue(), expected_data) + + def test_appe_append(self): + initial_data = b'12345' + append_data = b'67890' + expected_data = b'1234567890' + + with open(RESUME_FILENAME, 'wb') as f: + f.write(initial_data) + with open(RESUME_FILENAME, 'rb') as f: + self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) + + self.ftp.storbinary(f'APPE {RESUME_FILENAME}', BytesIO(append_data)) + + downloaded = BytesIO() + self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) + self.assertEqual(downloaded.getvalue(), expected_data) + + + def test_220_service_ready(self): + welcome = self.ftp.getwelcome() + self.assertTrue(welcome.startswith('220'), f"Expected 220 response, got: {welcome}") + + def test_user_login_success(self): + resp = self.ftp.login(user=FTP_USER, passwd=FTP_PASS) + self.assertTrue(resp.startswith('230'), f"Login failed or unexpected response: {resp}") + + def test_user_login_fail(self): + with self.assertRaises((error_perm, error_temp)): + self.ftp.login(user='wronguser', passwd='wrongpass') + + def test_pwd_and_cwd(self): + pwd = self.ftp.pwd() + self.assertTrue(pwd.startswith('/'), f"PWD should return directory path, got: {pwd}") + resp = self.ftp.cwd('/') + self.assertTrue(resp.startswith('250'), f"CWD should succeed with 250 response, got: {resp}") + + def Disabledtest_utf8_mkd(self): + """Verify server accepts MKD command with UTF-8 directory names.""" + test_dirs = [ + "Café", + "测试", + "директория", + "データ", + "résumé" + ] + + with ftplib.FTP() as ftp: + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + ftp.login(FTP_USER, FTP_PASS) + + for dir_name_utf8 in test_dirs: + with self.subTest(dir=dir_name_utf8): + print(f"C: MKD {dir_name_utf8}") + response = ftp.mkd(dir_name_utf8) + print(f"S: {response}") + + self.assertTrue(response.startswith('257'), f"MKD failed for directory '{dir_name_utf8}'") + + # Clean up after test + ftp.rmd(dir_name_utf8) + print(f"Removed directory: {dir_name_utf8}") + + + def test_pbsz_prot_violations(self): + HOST = '127.0.0.1' + PORT = 21 + + print("\n--- 12-15. Verifying Rules: PBSZ/PROT Command Sequences and Validation ---") + + # Scenario 1: Invoke PBSZ without prior security exchange (plaintext) + print("\n[+] Scenario 1: Invoke PBSZ on plaintext connection") + with socket.create_connection((HOST, PORT), timeout=5) as sock: + banner = sock.recv(1024).decode() + print(f"S: {banner.strip()}") + sock.sendall(b'PBSZ 0\r\n') + print("C: PBSZ 0") + response = sock.recv(1024).decode().strip() + print(f"S: {response}") + self.assertNotEqual(response.startswith('200'), True, "Violation: PBSZ accepted without security.") + + # Scenario 2 & 3: Use TLS connection for further checks + print("\n[+] Scenario 2: Invoke PBSZ with invalid parameters") + sock = socket.create_connection((HOST, PORT), timeout=5) + banner = sock.recv(1024).decode() + print(f"S: {banner.strip()}") + + sock.sendall(b'AUTH TLS\r\n') + auth_response = sock.recv(1024).decode().strip() + print(f"C: AUTH TLS\nS: {auth_response}") + + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + ssock = context.wrap_socket(sock, server_hostname=HOST) + + try: + ssock.sendall(b'PBSZ not_a_number\r\n') + print("C: PBSZ not_a_number") + response = ssock.recv(1024).decode().strip() + print(f"S: {response}") + self.assertNotEqual(response.startswith('200'), True, + "Violation: Server accepted invalid PBSZ parameter.") + + print("\n[+] Scenario 3: Invoke PROT on TLS connection without prior PBSZ") + ssock.sendall(b'PROT P\r\n') + print("C: PROT P") + response_prot = ssock.recv(1024).decode().strip() + print(f"S: {response_prot}") + self.assertNotEqual(response_prot.startswith('200'), True, + "Violation: Server accepted PROT without prior PBSZ.") + finally: + ssock.close() + + + + def Disabtest_utf8_mkd_with_opts(self): + dir_name_utf8 = "rés" + try: + with ftplib.FTP() as ftp: + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + ftp.login(FTP_USER, FTP_PASS) + + # Enable UTF-8 mode on the server + print("C: OPTS UTF8 ON") + response = ftp.sendcmd("OPTS UTF8 ON") + print(f"S: {response}") + + if not response.startswith('200'): + print("Warning: OPTS UTF8 not supported or rejected by server.") + + # Now try to create directory with UTF-8 name + print(f"C: MKD {dir_name_utf8}") + response = ftp.mkd(dir_name_utf8) + print(f"S: {response}") + + print(f"Successfully created UTF-8 directory: {dir_name_utf8}") + + except ftplib.error_perm as e: + print(f"FTP permission error creating directory '{dir_name_utf8}': {e}") + except Exception as e: + print(f"Error: {e}") + + + + def test_feat_space_indent(self): + """ + Verify whether each feature in the FEAT response correctly starts with a single space. + """ + HOST = '127.0.0.1' + PORT = 21 + USER = 'username' + PASS = 'password' + + print("\n--- 8. Verifying Rule: Missing Space Before FEAT Features ---") + + try: + sock = socket.create_connection((HOST, PORT), timeout=10) + + welcome = sock.recv(1024).decode().strip() + print(f"S: {welcome}") + + sock.sendall(f'USER {USER}\r\n'.encode()) + resp = sock.recv(1024).decode().strip() + print(f"C: USER {USER}\nS: {resp}") + + sock.sendall(f'PASS {PASS}\r\n'.encode()) + resp = sock.recv(1024).decode().strip() + print(f"C: PASS {PASS}\nS: {resp}") + + self.assertTrue(resp.startswith('230'), "Login failed") + + sock.sendall(b'FEAT\r\n') + + full_response = b'' + while True: + data = sock.recv(1024) + if not data: + break + full_response += data + if b'211 End.' in full_response: + break + + response_text = full_response.decode('utf-8', errors='ignore') + lines = response_text.split('\r\n') + + feature_lines = [] + in_features = False + + for line in lines: + if line.startswith('211-'): + in_features = True + continue + elif line.startswith('211 '): + break + elif in_features and line.strip(): + feature_lines.append(line) + + self.assertTrue(len(feature_lines) > 0, "No feature lines found in FEAT response") + + for line in feature_lines: + self.assertTrue(line.startswith(' '), f"Feature line does not start with a space: '{line}'") + + except Exception as e: + self.fail(f"Exception occurred during FEAT space indent verification: {e}") + + finally: + if 'sock' in locals(): + try: + sock.close() + except: + pass + + + def test_pasv_retr(self): + self.ftp.login(FTP_USER, FTP_PASS) + self.ftp.set_pasv(True) + with open(DOWNLOAD_FILENAME, 'wb') as f: + self.ftp.retrbinary(f'RETR {TEST_FILENAME}', f.write) + + self.assertTrue(os.path.exists(DOWNLOAD_FILENAME), "Downloaded file does not exist") + + # Check downloaded file size is > 0 + self.assertGreater(os.path.getsize(DOWNLOAD_FILENAME), 0, "Downloaded file is empty") + + + + def test_active_retr(self): + self.ftp.set_pasv(False) + try: + with open(DOWNLOAD_FILENAME, 'wb') as f: + self.ftp.retrbinary(f'RETR {TEST_FILENAME}', f.write) + except EOFError: + self.skipTest("Active mode data connection failed") + self.assertTrue(os.path.exists(DOWNLOAD_FILENAME), "Downloaded file does not exist") + + def test_rest_command(self): + with open(TEST_FILENAME, 'rb') as f: + f.seek(5) + expected_data = f.read() + + self.ftp.sendcmd('REST 5') + data = bytearray() + self.ftp.retrbinary(f'RETR {TEST_FILENAME}', data.extend) + self.assertEqual(data, expected_data, f"REST transfer mismatch.\nExpected: {expected_data!r}\nActual : {data!r}") + + def log_command(direction, command, response=None): + """Log FTP commands and responses""" + if direction == "C": + print(f"C: {command}") + elif direction == "S": + print(f"S: {response}") + elif direction == "ERROR": + print(f"ERROR: {command}") + elif direction == "INFO": + print(f"INFO: {command}") + + def test_rest_stor_resume_explicit(self): + """ + Test resuming an upload using REST + STOR. + This should append or overwrite starting from the specified offset. + """ + original_data = b'abcdefghij' # 10 bytes + resume_data = b'XYZ' # Will overwrite at offset 5 → expect: abcdeXYZij + expected_data = b'abcdeXYZij' + + # Step 1: Upload original data + with open(RESUME_FILENAME, 'wb') as f: + f.write(original_data) + with open(RESUME_FILENAME, 'rb') as f: + self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) + + # Step 2: Resume upload from offset 5 with 'XYZ' + self.ftp.sendcmd('REST 5') + self.ftp.storbinary(f'STOR {RESUME_FILENAME}', BytesIO(resume_data)) + + # Step 3: Download and verify final file content + downloaded = BytesIO() + self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write) + result = downloaded.getvalue() + + self.assertEqual(result, expected_data, + f"REST+STOR resume failed.\nExpected: {expected_data}\nActual : {result}") + + + def test_rest_violations(self): + FILE = "test_rest.txt" + CONTENT = b"1234567890" + + print("\n--- Verifying Rules: REST Command Sequence and Implementation ---") + print("Test Objective: Verify that the REST command correctly limits subsequent commands and handles offsets") + + + try: + with ftplib.FTP() as ftp: + ftp.set_debuglevel(0) + + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + + ftp.login(FTP_USER, FTP_PASS) + self.log_command("INFO", "Login successful") + + self.log_command("C", "TYPE I") + resp = ftp.sendcmd("TYPE I") + self.log_command("S", resp) + + self.log_command("C", f"STOR {FILE}") + ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT)) + self.log_command("INFO", f"File uploaded successfully, content: {CONTENT} (length: {len(CONTENT)})") + + print("\n" + "="*60) + print("[+] Scenario 1: Verify if RETR correctly handles the offset set by REST") + print(" RFC Requirement: RETR should start transferring from the byte position specified by REST") + + rest_offset = 5 + self.log_command("C", f"REST {rest_offset}") + response_rest2 = ftp.sendcmd(f"REST {rest_offset}") + self.log_command("S", response_rest2) + self.assertTrue(response_rest2.startswith('350'), f"REST {rest_offset} failed: {response_rest2}") + + self.log_command("C", f"RETR {FILE}") + out = BytesIO() + ftp.retrbinary(f"RETR {FILE}", out.write) + retrieved_content = out.getvalue() + + print(f"\n Transfer Result Analysis:") + print(f" Original File Content: {CONTENT} (length: {len(CONTENT)})") + print(f" REST Offset: {rest_offset} (starting from byte {rest_offset})") + print(f" Expected Transferred Content: {CONTENT[rest_offset:]} (length: {len(CONTENT[rest_offset:])})") + print(f" Actual Transferred Content: {retrieved_content} (length: {len(retrieved_content)})") + + # Assertion based on REST behavior + if len(retrieved_content) == len(CONTENT): + self.fail("RETR ignored REST offset: entire file downloaded.") + elif len(retrieved_content) == len(CONTENT) - rest_offset and retrieved_content == CONTENT[rest_offset:]: + print("No violation detected. RETR correctly handled the REST offset.") + else: + self.fail("REST offset handled incorrectly or data mismatch.") + + try: + self.log_command("C", f"DELE {FILE}") + # ftp.delete(FILE) # Uncomment if delete supported by server + self.log_command("INFO", "Test file cleanup completed") + except Exception: + self.log_command("ERROR", "Failed to cleanup test file") + + except Exception as e: + self.fail(f"Error during REST violation test: {e}") + + + def test_rest_stor_resume(self): + with open(UPLOAD_FILENAME, 'wb') as f: + f.write(TEST_CONTENT) + with open(UPLOAD_FILENAME, 'rb') as f: + self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) + + self.ftp.sendcmd('REST 5') + with open(UPLOAD_FILENAME, 'rb') as f: + f.seek(5) + resp = self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f) + self.assertTrue(resp.startswith('226') or resp.startswith('250'), + f"STOR after REST failed: {resp}") + + def test_rest_cleared_after_retr(self): + self.ftp.sendcmd('REST 5') + self.ftp.retrbinary(f'RETR {TEST_FILENAME}', lambda _: None) # discard output + + # Next RETR should start from 0 + data = bytearray() + self.ftp.retrbinary(f'RETR {TEST_FILENAME}', data.extend) + with open(TEST_FILENAME, 'rb') as f: + expected = f.read() + self.assertEqual(data, expected, "REST offset wasn't cleared after first RETR") + + def test_stor_upload(self): + with open(UPLOAD_FILENAME, 'wb') as f: + f.write(b'This is test upload content.') + with open(UPLOAD_FILENAME, 'rb') as f: + resp = self.ftp.storbinary(f'STOR {UPLOAD_FILENAME}', f) + self.assertTrue(resp.startswith('226') or resp.startswith('250'), + f"STOR command should succeed, got: {resp}") + + + + def test_ccc_without_prereq(self): + """Verify that the server rejects CCC command sent without an active TLS session.""" + try: + with FTP() as ftp: + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + ftp.login(FTP_USER, FTP_PASS) + + # Send CCC command in plaintext session + response = ftp.voidcmd('CCC') + + # If the server responds with 200, that is a violation + self.assertFalse(response.startswith('200'), + f"Server incorrectly accepted CCC command without TLS: {response}") + + except error_perm as e: + # Expected behavior: server rejects CCC in plaintext with 5xx error + self.assertTrue(str(e).startswith('5'), f"Unexpected permission error response: {e}") + + except Exception as e: + self.fail(f"Unexpected exception during CCC command test: {e}") + + def test_ccc_unprotected_reply(self): + """ + Verify if the server returns the correct 533 response code + when CCC is sent over an unprotected connection. + """ + try: + with FTP() as ftp: + ftp.connect(FTP_HOST, FTP_PORT, timeout=5) + ftp.login(FTP_USER, FTP_PASS) + + response = ftp.voidcmd('CCC') + + except error_perm as e: + response = str(e) + + except Exception as e: + self.fail(f"Unexpected exception during CCC command test: {e}") + + # The RFC requires the server to respond with 533 if CCC is disallowed in the current context + if '533' in response: + pass # Correct behavior + elif response.startswith('200') or response.startswith('502'): + self.fail(f"Violation: Server responded with '{response.split()[0]}' instead of '533'.") + else: + self.fail(f"Unexpected server response: {response}") + + def test_rest_offset_compliance(self): + """ + Verify that RETR respects REST offset and doesn't transfer the whole file. + """ + self.ftp.login(FTP_USER, FTP_PASS) + self.ftp.sendcmd("TYPE I") + + FILE = "test_rest.txt" + CONTENT = b"1234567890" + rest_offset = 5 + + # Upload known content + self.ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT)) + + # Set REST offset + resp = self.ftp.sendcmd(f"REST {rest_offset}") + self.assertTrue(resp.startswith('350'), f"Expected 350 response to REST, got: {resp}") + + # RETR after REST + out = BytesIO() + self.ftp.retrbinary(f"RETR {FILE}", out.write) + retrieved_content = out.getvalue() + + expected = CONTENT[rest_offset:] + + # Validate + self.assertEqual(retrieved_content, expected, + f"REST offset not respected.\nExpected: {expected!r}\nActual : {retrieved_content!r}") + + # Clean up (optional, since tearDown removes test files) + try: + self.ftp.delete(FILE) + except: + pass + + + def test_auth_reauth_violation(self): + """ + Verify whether the server enforces re-authentication after re-issuing the AUTH command. + """ + HOST = FTP_HOST + PORT = FTP_PORT + USER = FTP_USER + PASS = FTP_PASS + + print("\n--- Verifying Rule: Re-authentication Required After Re-issuing AUTH Command ---") + + sock = None + ssock = None + + try: + # Step 1: Connect and log in in plaintext + sock = socket.create_connection((HOST, PORT), timeout=5) + welcome = sock.recv(1024).decode().strip() + print(f"S: {welcome}") + + sock.sendall(f'USER {USER}\r\n'.encode()) + print(f"C: USER {USER}") + response_user = sock.recv(1024).decode().strip() + print(f"S: {response_user}") + + sock.sendall(f'PASS {PASS}\r\n'.encode()) + print(f"C: PASS {'*' * len(PASS)}") + response_login = sock.recv(1024).decode().strip() + print(f"S: {response_login}") + + self.assertTrue(response_login.startswith('230'), "Login failed; test cannot continue.") + + # Step 2: Verify logged-in status + sock.sendall(b'PWD\r\n') + print("C: PWD") + response_pwd1 = sock.recv(1024).decode().strip() + print(f"S: {response_pwd1}") + self.assertTrue(response_pwd1.startswith('257'), "PWD command failed; cannot confirm login status.") + + # Step 3: Re-issue AUTH TLS command to reset security state + sock.sendall(b'AUTH TLS\r\n') + print("C: AUTH TLS") + response_auth = sock.recv(1024).decode().strip() + print(f"S: {response_auth}") + + self.assertTrue(response_auth.startswith('234'), f"AUTH TLS command failed: {response_auth}") + + # Wrap socket in SSL context (skip certificate verification) + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + ssock = context.wrap_socket(sock, server_hostname=HOST) + print(" -> TLS handshake successful; connection is encrypted.") + + # Step 4: Attempt authenticated command again without re-authenticating + ssock.sendall(b'PWD\r\n') + print("C: PWD") + response_pwd2 = ssock.recv(1024).decode().strip() + print(f"S: {response_pwd2}") + + # Analyze result + if response_pwd2.startswith('257'): + self.fail("VIOLATION: Server accepted PWD command after AUTH TLS without re-authentication!") + elif response_pwd2.startswith('530'): + # Correct behavior: require login after re-auth + pass + else: + self.fail(f"Unexpected server response after AUTH TLS: {response_pwd2}") + + except Exception as e: + self.fail(f"Exception occurred during AUTH re-auth test: {e}") + + finally: + if ssock: + ssock.close() + elif sock: + sock.close() + + + def test_quit(self): + resp = self.ftp.quit() + self.assertTrue(resp.startswith('221'), f"QUIT should respond with 221, got: {resp}") + + def Disabledtest_invalid_command(self): + try: + resp = self.ftp.sendcmd('FOOBAR') + self.assertTrue(resp.startswith('500') or resp.startswith('502'), + f"Invalid command should respond with 5xx error, got: {resp}") + except error_perm as e: + self.assertTrue(str(e).startswith('500') or str(e).startswith('502'), + f"Invalid command error should start with 5xx, got: {e}") + + +if __name__ == '__main__': + unittest.main() diff --git a/test/resume_test.txt b/test/resume_test.txt new file mode 100644 index 0000000..c76a964 --- /dev/null +++ b/test/resume_test.txt @@ -0,0 +1 @@ +abcdefghij \ No newline at end of file diff --git a/test/test1.py b/test/test1.py new file mode 100644 index 0000000..0ff5086 --- /dev/null +++ b/test/test1.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 09:09:55 2025 + +@author: ugo +""" + +import ftplib +import socket + +def verify_ccc_no_prereq_check(): + """python + Send CCC directly in an unencrypted session to verify if the server handles it incorrectly. + """ + HOST = '127.0.0.1' + PORT = 21 + USER = 'username' + PASS = 'password' + + print("\n--- 2. Verifying Rule: CCC Without Preceding Security Exchange Check ---") + + try: + with ftplib.FTP() as ftp: + ftp.connect(HOST, PORT, timeout=5) + ftp.login(USER, PASS) + print("Successfully logged in in a plaintext session.") + + print("C: CCC") + # A non-compliant server will return 200 OK, indicating it accepted the command + response = ftp.voidcmd('CCC') + print(f"S: {response}") + + if response.startswith('200'): + print("\n[!] VIOLATION CONFIRMED: Server accepted CCC command without an active TLS session.") + else: + print("\n[-] No violation detected or server returned an unexpected response.") + + except ftplib.error_perm as e: + print(f"[-] Server correctly rejected the command. No violation. Server response: {e}") + except Exception as e: + print(f"\nError occurred: {e}") + +if __name__ == "__main__": + verify_ccc_no_prereq_check() \ No newline at end of file diff --git a/test/test2.py b/test/test2.py new file mode 100644 index 0000000..faf5d70 --- /dev/null +++ b/test/test2.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 09:20:47 2025 + +@author: ugo +""" + +import ftplib +import socket + +def verify_ccc_unprotected_reply(): + """ + Verify if the server returns an incorrect response code when CCC is sent over an unprotected connection. + """ + HOST = '127.0.0.1' + PORT = 21 + USER = 'username' + PASS = 'password' + + print("\n--- 3. Verifying Rule: Response Code for Unprotected CCC ---") + + try: + with ftplib.FTP() as ftp: + ftp.connect(HOST, PORT, timeout=5) + ftp.login(USER, PASS) + print("Successfully logged in in a plaintext session.") + + print("C: CCC") + response = ftp.voidcmd('CCC') + print(f"S: {response}") + + except ftplib.error_perm as e: + response = str(e) + print(f"S: {response}") + except Exception as e: + print(f"\nError occurred: {e}") + return + + if '533' in response: + print("\n[-] No violation detected. Server correctly responded with 533.") + elif '200' in response or '502' in response: + print(f"\n[!] VIOLATION CONFIRMED: Server responded with '{response.split()[0]}' instead of the required '533'.") + else: + print("\n[?] Server returned an unexpected response.") + +if __name__ == "__main__": + verify_ccc_unprotected_reply() diff --git a/test/test3.py b/test/test3.py new file mode 100644 index 0000000..92a0d70 --- /dev/null +++ b/test/test3.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 14:54:55 2025 + +@author: ugo +""" + +import socket +import ssl + +def verify_auth_reauth_violation(): + """ + Verify whether the server enforces re-authentication after re-issuing the AUTH command. + """ + HOST = '127.0.0.1' + PORT = 21 + USER = 'username' + PASS = 'password' + + print("--- Verifying Rule: Re-authentication Required After Re-issuing AUTH Command ---") + + try: + # Step 1: Connect and log in in plaintext + print("\n[+] Step 1: Establish plaintext connection and log in...") + sock = socket.create_connection((HOST, PORT), timeout=5) + print(f"S: {sock.recv(1024).decode().strip()}") + + sock.sendall(f'USER {USER}\r\n'.encode()) + print(f"C: USER {USER}") + print(f"S: {sock.recv(1024).decode().strip()}") + + sock.sendall(f'PASS {PASS}\r\n'.encode()) + print(f"C: PASS {PASS}") + response_login = sock.recv(1024).decode().strip() + print(f"S: {response_login}") + + if not response_login.startswith('230'): + print("[-] Login failed; test cannot continue.") + return + + # Step 2: Verify logged-in status + print("\n[+] Step 2: Execute PWD command in plaintext mode to verify login status...") + sock.sendall(b'PWD\r\n') + print("C: PWD") + response_pwd1 = sock.recv(1024).decode().strip() + print(f"S: {response_pwd1}") + if not response_pwd1.startswith('257'): + print("[-] PWD command failed; cannot confirm login status.") + return + print(" -> Status confirmed: User is logged in.") + + # Step 3: Re-issue AUTH command to reset security state + print("\n[+] Step 3: Re-issue AUTH TLS command to upgrade connection to encrypted mode...") + sock.sendall(b'AUTH TLS\r\n') + print("C: AUTH TLS") + response_auth = sock.recv(1024).decode().strip() + print(f"S: {response_auth}") + + if not response_auth.startswith('234'): + print(f"[-] AUTH TLS command failed: {response_auth}") + return + + # Create SSL context that does not verify certificates + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + # Wrap regular socket into SSL socket + ssock = context.wrap_socket(sock, server_hostname=HOST) + print(" -> TLS handshake successful; connection is encrypted.") + + # Step 4: Attempt authenticated command again without re-authenticating + print("\n[+] Step 4: In encrypted mode, execute PWD command directly without re-logging in...") + ssock.sendall(b'PWD\r\n') + print("C: PWD") + response_pwd2 = ssock.recv(1024).decode().strip() + print(f"S: {response_pwd2}") + + # Final analysis + print("\n--- Final Analysis ---") + if response_pwd2.startswith('257'): + print("[!] VIOLATION CONFIRMED: Server accepted the command, proving user login state was retained!") + print("[!] This is a high-risk authentication bypass vulnerability.") + elif response_pwd2.startswith('530'): + print("[-] NO VIOLATION: Server correctly rejected the command, requiring re-authentication.") + else: + print(f"[?] UNEXPECTED RESPONSE: Server returned an unexpected response code: {response_pwd2}") + + except Exception as e: + print(f"\nError occurred: {e}") + finally: + if 'ssock' in locals(): + ssock.close() + elif 'sock' in locals(): + sock.close() + +if __name__ == "__main__": + verify_auth_reauth_violation() diff --git a/test/test4.py b/test/test4.py new file mode 100644 index 0000000..201d977 --- /dev/null +++ b/test/test4.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 15:07:22 2025 + +@author: ugo +""" + +import socket + +def verify_feat_space_indent(): + """ + Verify whether each feature in the FEAT response correctly starts with a single space. + """ + HOST = '127.0.0.1' + PORT = 21 + USER = 'username' + PASS = 'password' + + print("\n--- 8. Verifying Rule: Missing Space Before FEAT Features ---") + + try: + # Use raw socket connection for better reliability + sock = socket.create_connection((HOST, PORT), timeout=10) + + # Read welcome message + welcome = sock.recv(1024).decode().strip() + print(f"S: {welcome}") + + # Login + sock.sendall(b'USER username\r\n') + resp = sock.recv(1024).decode().strip() + print(f"C: USER username\nS: {resp}") + + sock.sendall(b'PASS password\r\n') + resp = sock.recv(1024).decode().strip() + print(f"C: PASS password\nS: {resp}") + + if not resp.startswith('230'): + print("Login failed") + return + + # Send FEAT command + print("C: FEAT") + sock.sendall(b'FEAT\r\n') + + # Read complete FEAT response + full_response = b'' + while True: + data = sock.recv(1024) + if not data: + break + full_response += data + # Check for end marker "211 End." + if b'211 End.' in full_response: + break + + # Parse response + response_text = full_response.decode('utf-8', errors='ignore') + lines = response_text.split('\r\n') + + print("S: Full response:") + for line in lines: + if line.strip(): # Print only non-empty lines + print(f" {line}") + + # Find feature lines (between 211- and 211 End.) + feature_lines = [] + in_features = False + + for line in lines: + if line.startswith('211-'): + in_features = True + continue + elif line.startswith('211 '): + break + elif in_features and line.strip(): + feature_lines.append(line) + + print(f"\nParsed feature lines ({len(feature_lines)} lines):") + for line in feature_lines: + print(f" '{line}'") + + # Check for violations + violation_found = False + if not feature_lines: + print("No feature lines found.") + return + + for line in feature_lines: + # Per RFC 2389, each feature line must start with a single space + if not line.startswith(' '): + print(f"\n[!] VIOLATION CONFIRMED: Feature line '{line.strip()}' is missing the required leading space.") + print(f" Original line: '{line}' (length: {len(line)})") + violation_found = True + else: + print(f"[-] Correct format: '{line.strip()}' (starts with space)") + + if not violation_found: + print("\n[-] No violations detected. All feature lines are correctly formatted.") + else: + print(f"\n[!] RFC 2389 violation found: Feature lines must start with a space!") + + except Exception as e: + print(f"\nError occurred: {e}") + import traceback + traceback.print_exc() + finally: + if 'sock' in locals(): + try: + sock.close() + except: + pass + +if __name__ == "__main__": + verify_feat_space_indent() \ No newline at end of file diff --git a/test/test5.py b/test/test5.py new file mode 100644 index 0000000..2b1cb2d --- /dev/null +++ b/test/test5.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 15:44:40 2025 + +@author: ugo +""" + +import ftplib + +def test_utf8_mkd(server, port, user, password, dir_name_utf8): + try: + with ftplib.FTP() as ftp: + ftp.connect(server, port, timeout=5) + ftp.login(user, password) + + print(f"C: MKD {dir_name_utf8}") + response = ftp.mkd(dir_name_utf8) + print(f"S: {response}") + + # Clean up after test + ftp.rmd(dir_name_utf8) + print(f"Removed directory: {dir_name_utf8}") + + except ftplib.error_perm as e: + print(f"FTP permission error: {e}") + except Exception as e: + print(f"Error: {e}") + +if __name__ == "__main__": + # Example UTF-8 directory names to test: + test_dirs = [ + "Café", + "测试", + "директория", + "データ", + "résumé" + ] + + # Update with your FTP server info: + FTP_SERVER = "127.0.0.1" + FTP_PORT = 21 + FTP_USER = "username" + FTP_PASS = "password" + + for d in test_dirs: + print("\n--- Testing MKD with UTF-8 directory ---") + test_utf8_mkd(FTP_SERVER, FTP_PORT, FTP_USER, FTP_PASS, d) diff --git a/test/test6.py b/test/test6.py new file mode 100644 index 0000000..a125d58 --- /dev/null +++ b/test/test6.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 15:52:21 2025 + +@author: ugo +""" + +import ftplib + +def test_opts_utf8(server, port, user, password): + try: + with ftplib.FTP() as ftp: + ftp.connect(server, port, timeout=5) + ftp.login(user, password) + + # Send OPTS UTF8 ON command + print("C: OPTS UTF8 ON") + response = ftp.sendcmd("OPTS UTF8 ON") + print(f"S: {response}") + + # Check if server responded with 200 + if response.startswith('200'): + print("OPTS UTF8 command is supported and accepted.") + else: + print("OPTS UTF8 command is not supported or rejected.") + + except ftplib.error_perm as e: + print(f"FTP permission error: {e}") + except Exception as e: + print(f"Error: {e}") + +if __name__ == "__main__": + FTP_SERVER = "127.0.0.1" + FTP_PORT = 21 + FTP_USER = "username" + FTP_PASS = "password" + + test_opts_utf8(FTP_SERVER, FTP_PORT, FTP_USER, FTP_PASS) diff --git a/test/test7.py b/test/test7.py new file mode 100644 index 0000000..66e8aba --- /dev/null +++ b/test/test7.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 16:28:50 2025 + +@author: ugo +""" + +import socket +import ssl + +def verify_pbsz_prot_violations(): + """ + Combined verification of multiple sequence and validation violations related to PBSZ and PROT. + """ + HOST = '127.0.0.1' + PORT = 21 + USER = 'username' + PASS = 'password' + + print("\n--- 12-15. Verifying Rules: PBSZ/PROT Command Sequences and Validation ---") + + # Scenario 1: Invoke PBSZ without prior security exchange + print("\n[+] Scenario 1: Invoke PBSZ on plaintext connection") + try: + with socket.create_connection((HOST, PORT), timeout=5) as sock: + sock.recv(1024) + sock.sendall(b'PBSZ 0\r\n') + print("C: PBSZ 0") + response = sock.recv(1024).decode().strip() + print(f"S: {response}") + if response.startswith('200'): + print(" -> VIOLATION CONFIRMED: Server accepted PBSZ without prior security exchange.") + else: + print(" -> No violation detected for this case.") + except Exception as e: + print(f" -> Error occurred: {e}") + + # Scenario 2: Invoke PBSZ with invalid parameters + print("\n[+] Scenario 2: Invoke PBSZ with invalid parameters") + try: + # Need a TLS connection to proceed + sock = socket.create_connection((HOST, PORT), timeout=5) + sock.recv(1024) + sock.sendall(b'AUTH TLS\r\n') + sock.recv(1024) + + # Create SSL context that doesn't verify certificates + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + ssock = context.wrap_socket(sock, server_hostname=HOST) + + ssock.sendall(b'PBSZ not_a_number\r\n') + print("C: PBSZ not_a_number") + response = ssock.recv(1024).decode().strip() + print(f"S: {response}") + if response.startswith('200'): + print(" -> VIOLATION CONFIRMED: Server responded with 200 OK to invalid PBSZ parameter instead of 501.") + else: + print(" -> No violation detected for this case.") + + # Scenario 3: Invoke PROT without calling PBSZ first + print("\n[+] Scenario 3: Invoke PROT on TLS connection without prior PBSZ") + ssock.sendall(b'PROT P\r\n') + print("C: PROT P") + response_prot = ssock.recv(1024).decode().strip() + print(f"S: {response_prot}") + if response_prot.startswith('200'): + print(" -> VIOLATION CONFIRMED: Server accepted PROT without prior PBSZ.") + else: + print(" -> No violation detected for this case.") + + except Exception as e: + print(f"\nError occurred: {e}") + finally: + if 'ssock' in locals(): + ssock.close() + +if __name__ == "__main__": + verify_pbsz_prot_violations() \ No newline at end of file diff --git a/test/test8.py b/test/test8.py new file mode 100644 index 0000000..e93ff50 --- /dev/null +++ b/test/test8.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Sat Jul 19 18:15:07 2025 + +@author: ugo +""" + +import socket +import ssl +import ftplib +from io import BytesIO + +def log_command(direction, command, response=None): + """Log FTP commands and responses""" + if direction == "C": + print(f"C: {command}") + elif direction == "S": + print(f"S: {response}") + elif direction == "ERROR": + print(f"ERROR: {command}") + elif direction == "INFO": + print(f"INFO: {command}") + +def verify_rest_violations(): + """ + Verify REST-related sequence and implementation violations. + """ + HOST = '127.0.0.1' + PORT = 21 + USER = 'username' + PASS = 'password' + FILE = "test_rest.txt" + CONTENT = b"1234567890" + + print("\n--- Verifying Rules: REST Command Sequence and Implementation ---") + print("Test Objective: Verify that the REST command correctly limits subsequent commands and handles offsets") + + try: + with ftplib.FTP() as ftp: + # Enable debug mode to show all FTP interactions + ftp.set_debuglevel(0) # We handle display ourselves + + log_command("INFO", f"Connecting to {HOST}:{PORT}") + ftp.connect(HOST, PORT, timeout=5) + + log_command("C", f"USER {USER}") + ftp.login(USER, PASS) + log_command("INFO", "Login successful") + + log_command("C", "TYPE I") + resp = ftp.sendcmd("TYPE I") + log_command("S", resp) + + # Upload test file + log_command("C", f"STOR {FILE}") + ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT)) + log_command("INFO", f"File uploaded successfully, content: {CONTENT} (length: {len(CONTENT)})") + + print("\n" + "="*60) + # Scenario 1: Verify if RETR correctly handles the offset set by REST + print("[+] Scenario 1: Verify if RETR correctly handles the offset set by REST") + print(" RFC Requirement: RETR should start transferring from the byte position specified by REST") + + # Reset REST point + rest_offset = 5 + log_command("C", f"REST {rest_offset}") + try: + response_rest2 = ftp.sendcmd(f"REST {rest_offset}") + log_command("S", response_rest2) + + if not response_rest2.startswith('350'): + print(f" REST {rest_offset} failed: {response_rest2}") + print(f" Expected: 350 Restarting at {rest_offset}") + return + else: + print(f" REST set offset to byte {rest_offset}") + except ftplib.error_perm as e: + log_command("ERROR", f"REST {rest_offset} failed: {e}") + return + + # Download file + log_command("C", f"RETR {FILE}") + out = BytesIO() + ftp.retrbinary(f"RETR {FILE}", out.write) + retrieved_content = out.getvalue() + + # Detailed analysis of results + print(f"\n Transfer Result Analysis:") + print(f" Original File Content: {CONTENT} (length: {len(CONTENT)})") + print(f" REST Offset: {rest_offset} (starting from byte {rest_offset})") + print(f" Expected Transferred Content: {CONTENT[rest_offset:]} (length: {len(CONTENT[rest_offset:])})") + print(f" Actual Transferred Content: {retrieved_content} (length: {len(retrieved_content)})") + + if len(retrieved_content) == len(CONTENT): + print("\n VIOLATION CONFIRMED: RETR implementation ignored the offset set by REST") + print(" Actual Result: Downloaded the entire file, ignoring the REST offset") + print(f" Expected Result: Should only download {CONTENT[rest_offset:]} ({len(CONTENT[rest_offset:])} bytes)") + print(" Violation Explanation: Violates RFC 959 regarding the REST command") + elif len(retrieved_content) == len(CONTENT) - rest_offset: + if retrieved_content == CONTENT[rest_offset:]: + print(" No violation detected. RETR correctly handled the REST offset.") + else: + print(" VIOLATION CONFIRMED: Content mismatch, REST offset handled incorrectly") + print(f" Actual Content: {retrieved_content}") + print(f" Expected Content: {CONTENT[rest_offset:]}") + print(" Violation Explanation: Incorrect REST offset calculation") + elif len(retrieved_content) > len(CONTENT): + print(" VIOLATION CONFIRMED: Retrieved data length exceeds original file size") + print(f" Actual Length: {len(retrieved_content)} bytes") + print(f" Expected Maximum Length: {len(CONTENT)} bytes") + print(f" Expected Length after REST: {len(CONTENT[rest_offset:])} bytes") + print(" Violation Explanation: Severe data transfer anomaly") + else: + print(f" VIOLATION CONFIRMED: Unexpected retrieved content length") + print(f" Actual Length: {len(retrieved_content)} bytes") + print(f" Expected Length: {len(CONTENT[rest_offset:])} bytes") + print(" Violation Explanation: Problem with REST command implementation") + + # Clean up test file + try: + log_command("C", f"DELE {FILE}") + #ftp.delete(FILE) + log_command("INFO", "Test file cleanup completed") + except: + log_command("ERROR", "Failed to cleanup test file") + + except Exception as e: + print(f"\nError occurred: {e}") + import traceback + print("Detailed error information:") + traceback.print_exc() + print("\nPossible causes:") + print(" - Server connection issues") + print(" - Insufficient permissions") + print(" - Server does not support certain commands") + +if __name__ == "__main__": + verify_rest_violations() \ No newline at end of file