fix: rest command support + integration tests

This commit is contained in:
Ugo
2025-07-20 15:12:22 +01:00
parent eac95c1129
commit c4ab90b358
14 changed files with 1362 additions and 83 deletions

View File

@ -55,6 +55,7 @@
/* Private function definition */
static int processCommand(int processingElement, ftpDataType *ftpData);
static void memoryDebug(ftpDataType *ftpData);
static int isTransferCommand(int processingElement, ftpDataType *ftpData);
void evaluateControlChannel(ftpDataType *ftpData)
{
@ -235,6 +236,22 @@ static void memoryDebug(ftpDataType *ftpData)
}
}
static int isTransferCommand(int processingElement, ftpDataType *ftpData)
{
if (IS_CMD(ftpData->clients[processingElement].theCommandReceived, "RETR") ||
IS_CMD(ftpData->clients[processingElement].theCommandReceived, "STOR") ||
IS_CMD(ftpData->clients[processingElement].theCommandReceived, "PASV") ||
IS_CMD(ftpData->clients[processingElement].theCommandReceived, "TYPE I") ||
IS_CMD(ftpData->clients[processingElement].theCommandReceived, "TYPE A") ||
IS_CMD(ftpData->clients[processingElement].theCommandReceived, "TYPE F") ||
IS_CMD(ftpData->clients[processingElement].theCommandReceived, "APPE"))
{
return 1;
}
return 0;
}
static int processCommand(int processingElement, ftpDataType *ftpData)
{
//command handler structure
@ -291,6 +308,13 @@ static int processCommand(int processingElement, ftpDataType *ftpData)
//printTimeStamp();
my_printf("\nCommand received from (%d): %s", processingElement, ftpData->clients[processingElement].theCommandReceived);
if(!isTransferCommand(processingElement, ftpData) &&
ftpData->clients[processingElement].workerData.retrRestartAtByte != 0)
{
my_printf("Reset: retrRestartAtByte");
ftpData->clients[processingElement].workerData.retrRestartAtByte = 0;
}
cleanDynamicStringDataType(&ftpData->clients[processingElement].ftpCommand.commandArgs, 0, ftpData->clients[processingElement].memoryTable);
cleanDynamicStringDataType(&ftpData->clients[processingElement].ftpCommand.commandOps, 0, ftpData->clients[processingElement].memoryTable);

View File

@ -133,92 +133,88 @@ void workerCleanup(cleanUpWorkerArgs *args)
static int processStorAppe(cleanUpWorkerArgs *args)
{
ftpDataType *ftpData = args->ftpData;
int theSocketId = args->socketId;
int returnCode = 0;
int theSocketId = args->socketId;
int returnCode = 0;
off_t restartPos = ftpData->clients[theSocketId].workerData.retrRestartAtByte;
FILE *file = NULL;
if (compareStringCaseInsensitive(ftpData->clients[theSocketId].workerData.theCommandReceived, "APPE", strlen("APPE")) == 1)
{
#ifdef LARGE_FILE_SUPPORT_ENABLED
//#warning LARGE FILE SUPPORT IS ENABLED!
ftpData->clients[theSocketId].workerData.theStorFile = fopen64(ftpData->clients[theSocketId].fileToStor.text, "ab");
#endif
const char *filePath = ftpData->clients[theSocketId].fileToStor.text;
const char *command = ftpData->clients[theSocketId].workerData.theCommandReceived;
#ifndef LARGE_FILE_SUPPORT_ENABLED
#warning LARGE FILE SUPPORT IS NOT ENABLED!
ftpData->clients[theSocketId].workerData.theStorFile = fopen(ftpData->clients[theSocketId].fileToStor.text, "ab");
#endif
}
else
{
#ifdef LARGE_FILE_SUPPORT_ENABLED
//#warning LARGE FILE SUPPORT IS ENABLED!
ftpData->clients[theSocketId].workerData.theStorFile = fopen64(ftpData->clients[theSocketId].fileToStor.text, "wb");
#endif
int isAppe = compareStringCaseInsensitive(command, "APPE", strlen("APPE")) == 1;
#ifndef LARGE_FILE_SUPPORT_ENABLED
#warning LARGE FILE SUPPORT IS NOT ENABLED!
ftpData->clients[theSocketId].workerData.theStorFile = fopen(ftpData->clients[theSocketId].fileToStor.text, "wb");
#endif
}
#ifdef LARGE_FILE_SUPPORT_ENABLED
if (isAppe) {
file = fopen64(filePath, "ab");
} else if (restartPos > 0) {
file = fopen64(filePath, "r+b");
} else {
file = fopen64(filePath, "wb");
}
#else
if (isAppe) {
file = fopen(filePath, "ab");
} else if (restartPos > 0) {
file = fopen(filePath, "r+b");
} else {
file = fopen(filePath, "wb");
}
#endif
if (ftpData->clients[theSocketId].workerData.theStorFile == NULL)
{
ftpData->clients[theSocketId].workerData.theStorFile = file;
if (file == NULL) {
returnCode = socketPrintf(ftpData, theSocketId, "s", "553 Unable to write the file\r\n");
if (returnCode <= 0)
{
if (returnCode <= 0) {
ftpData->clients[theSocketId].closeTheClient = 1;
LOG_ERROR("socketPrintf");
LOG_ERROR("socketPrintf");
my_printf("\n Closing the client 6");
return -1;
}
return -1;
}
while(1)
{
if (ftpData->clients[theSocketId].dataChannelIsTls != 1)
{
ftpData->clients[theSocketId].workerData.bufferIndex = read(ftpData->clients[theSocketId].workerData.socketConnection, ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE);
}
else if (ftpData->clients[theSocketId].dataChannelIsTls == 1)
{
#ifdef OPENSSL_ENABLED
if (ftpData->clients[theSocketId].workerData.passiveModeOn == 1)
ftpData->clients[theSocketId].workerData.bufferIndex = SSL_read(ftpData->clients[theSocketId].workerData.serverSsl, ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE);
else if(ftpData->clients[theSocketId].workerData.activeModeOn == 1)
ftpData->clients[theSocketId].workerData.bufferIndex = SSL_read(ftpData->clients[theSocketId].workerData.clientSsl, ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE);
#endif
}
else
{
my_printf("\nError state");
}
if (!isAppe && restartPos > 0) {
fseeko(file, restartPos, SEEK_SET);
ftpData->clients[theSocketId].workerData.retrRestartAtByte = 0;
}
if (ftpData->clients[theSocketId].workerData.bufferIndex == 0)
{
break;
while (1) {
int bytesRead = 0;
if (ftpData->clients[theSocketId].dataChannelIsTls != 1) {
bytesRead = read(ftpData->clients[theSocketId].workerData.socketConnection,
ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE);
}
else if (ftpData->clients[theSocketId].workerData.bufferIndex > 0)
{
fwrite(ftpData->clients[theSocketId].workerData.buffer, ftpData->clients[theSocketId].workerData.bufferIndex, 1, ftpData->clients[theSocketId].workerData.theStorFile);
#ifdef OPENSSL_ENABLED
else {
if (ftpData->clients[theSocketId].workerData.passiveModeOn == 1) {
bytesRead = SSL_read(ftpData->clients[theSocketId].workerData.serverSsl,
ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE);
} else if (ftpData->clients[theSocketId].workerData.activeModeOn == 1) {
bytesRead = SSL_read(ftpData->clients[theSocketId].workerData.clientSsl,
ftpData->clients[theSocketId].workerData.buffer, CLIENT_BUFFER_STRING_SIZE);
}
}
#endif
if (bytesRead == 0) {
break;
} else if (bytesRead > 0) {
fwrite(ftpData->clients[theSocketId].workerData.buffer, bytesRead, 1, file);
usleep(100);
ftpData->clients[theSocketId].lastActivityTimeStamp = (int)time(NULL);
}
else if (ftpData->clients[theSocketId].workerData.bufferIndex < 0)
{
} else {
break;
}
}
int theReturnCode;
theReturnCode = fclose(ftpData->clients[theSocketId].workerData.theStorFile);
fclose(file);
ftpData->clients[theSocketId].workerData.theStorFile = NULL;
if (ftpData->clients[theSocketId].login.ownerShip.ownerShipSet == 1)
{
FILE_doChownFromUidGid(ftpData->clients[theSocketId].fileToStor.text, ftpData->clients[theSocketId].login.ownerShip.uid, ftpData->clients[theSocketId].login.ownerShip.gid);
if (ftpData->clients[theSocketId].login.ownerShip.ownerShipSet == 1) {
FILE_doChownFromUidGid(filePath, ftpData->clients[theSocketId].login.ownerShip.uid,
ftpData->clients[theSocketId].login.ownerShip.gid);
}
ftpData->clients[theSocketId].workerData.commandProcessed = 1;
@ -227,6 +223,7 @@ static int processStorAppe(cleanUpWorkerArgs *args)
return 1;
}
static int acceptConnection(cleanUpWorkerArgs *args)
{
ftpDataType *ftpData = args->ftpData;
@ -387,6 +384,8 @@ static int processRetr(cleanUpWorkerArgs *args)
int returnCode = 0;
long long int writenSize = 0, writeReturn = 0;
my_printf("\n ftpData->clients[theSocketId].workerData.retrRestartAtByte = %d", ftpData->clients[theSocketId].workerData.retrRestartAtByte);
writenSize = writeRetrFile(ftpData, theSocketId, ftpData->clients[theSocketId].workerData.retrRestartAtByte, ftpData->clients[theSocketId].workerData.theStorFile);
ftpData->clients[theSocketId].workerData.retrRestartAtByte = 0;

View File

@ -2239,17 +2239,13 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st
{
long long int readen = 0;
long long int toReturn = 0, writtenSize = 0;
long long int currentPosition = 0;
long long int theFileSize;
char buffer[FTP_COMMAND_ELABORATE_CHAR_BUFFER];
memset(buffer, 0, FTP_COMMAND_ELABORATE_CHAR_BUFFER);
#ifdef LARGE_FILE_SUPPORT_ENABLED
// #warning LARGE FILE SUPPORT IS ENABLED!
retrFP = fopen64(data->clients[theSocketId].fileToRetr.text, "rb");
#endif
#ifndef LARGE_FILE_SUPPORT_ENABLED
#warning LARGE FILE SUPPORT IS NOT ENABLED!
#else
retrFP = fopen(data->clients[theSocketId].fileToRetr.text, "rb");
#endif
@ -2259,29 +2255,29 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st
}
theFileSize = FILE_GetFileSize(retrFP);
my_printf("\ntheFileSize %lld", theFileSize);
if (startFrom > 0)
{
my_printf("\nCursor moved %lld", startFrom);
#ifdef LARGE_FILE_SUPPORT_ENABLED
// #warning LARGE FILE SUPPORT IS ENABLED!
currentPosition = (long long int)lseek64(fileno(retrFP), startFrom, SEEK_SET);
if (fseeko64(retrFP, startFrom, SEEK_SET) != 0)
#else
if (fseeko(retrFP, startFrom, SEEK_SET) != 0)
#endif
#ifndef LARGE_FILE_SUPPORT_ENABLED
#warning LARGE FILE SUPPORT IS NOT ENABLED!
currentPosition = (long long int)lseek(fileno(retrFP), startFrom, SEEK_SET);
#endif
if (currentPosition == -1)
{
fclose(retrFP);
retrFP = NULL;
return toReturn;
}
my_printf("\ncurrentPosition %lld", startFrom);
}
while ((readen = (long long int)fread(buffer, sizeof(char), FTP_COMMAND_ELABORATE_CHAR_BUFFER, retrFP)) > 0)
{
my_printf("\nTRANSFER read %lld bytes: %.*s", readen, (int)readen, buffer);
if (data->clients[theSocketId].dataChannelIsTls != 1)
{
@ -2289,7 +2285,6 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st
}
else
{
#ifdef OPENSSL_ENABLED
if (data->clients[theSocketId].workerData.passiveModeOn == 1)
writtenSize = SSL_write(data->clients[theSocketId].workerData.serverSsl, buffer, readen);
@ -2300,7 +2295,6 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st
if (writtenSize <= 0)
{
my_printf("\nError %lld while writing retr file.", writtenSize);
fclose(retrFP);
retrFP = NULL;
@ -2308,7 +2302,7 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st
}
else
{
toReturn = toReturn + writtenSize;
toReturn += writtenSize;
data->clients[theSocketId].lastActivityTimeStamp = (int)time(NULL);
}
}
@ -2317,6 +2311,8 @@ long long int writeRetrFile(ftpDataType *data, int theSocketId, long long int st
return toReturn;
}
char *getFtpCommandArg(char *theCommand, char *theCommandString, int skipArgs)
{
char *toReturn = theCommandString + strlen(theCommand);

View File

@ -702,7 +702,7 @@ void resetWorkerData(ftpDataType *data, int clientId, int isInitialization)
data->clients[clientId].workerData.socketConnection = 0;
data->clients[clientId].workerData.bufferIndex = 0;
data->clients[clientId].workerData.commandReceived = 0;
data->clients[clientId].workerData.retrRestartAtByte = 0;
// data->clients[clientId].workerData.retrRestartAtByte = 0;
data->clients[clientId].workerData.threadIsAlive = 0;
data->clients[clientId].workerData.activeModeOn = 0;
data->clients[clientId].workerData.extendedPassiveModeOn = 0;

644
test/integration.py Normal file
View File

@ -0,0 +1,644 @@
import unittest
from ftplib import FTP, error_perm, error_temp
import os
from io import BytesIO
import time
import ssl
import socket
import ftplib
FTP_HOST = '127.0.0.1'
FTP_PORT = 21
FTP_USER = 'username'
FTP_PASS = 'password'
TEST_FILENAME = 'test.txt'
TEST_CONTENT = b'1234567890' # 10 bytes known content
DOWNLOAD_FILENAME = 'downloaded_test.txt'
UPLOAD_FILENAME = 'upload_test.txt'
RESUME_FILENAME = 'resume_test.txt'
class FTPServerRFCComplianceTests(unittest.TestCase):
def setUp(self):
# Create local test file first
with open(TEST_FILENAME, 'wb') as f:
f.write(TEST_CONTENT)
# Connect and login
self.ftp = FTP()
self.ftp.connect(FTP_HOST, FTP_PORT, timeout=10)
self.ftp.login(FTP_USER, FTP_PASS)
# Upload file to FTP server
with open(TEST_FILENAME, 'rb') as f:
self.ftp.storbinary(f"STOR {TEST_FILENAME}", f)
def tearDown(self):
try:
# Remove test files from server if they exist
for fname in [TEST_FILENAME, UPLOAD_FILENAME, RESUME_FILENAME]:
try:
self.ftp.delete(fname)
except Exception:
pass
self.ftp.quit()
except Exception:
self.ftp.close()
# Remove local temp files
for fname in [TEST_FILENAME, DOWNLOAD_FILENAME, UPLOAD_FILENAME]:
if os.path.exists(fname):
os.remove(fname)
def test_stor_write(self):
original_data = b'abcdefghij'
with open(RESUME_FILENAME, 'wb') as f:
f.write(original_data)
with open(RESUME_FILENAME, 'rb') as f:
self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f)
downloaded = BytesIO()
self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write)
self.assertEqual(downloaded.getvalue(), original_data)
def test_stor_resume_with_rest(self):
original_data = b'abcdefghij'
resume_data = b'XYZ'
expected_data = b'abcdeXYZij'
with open(RESUME_FILENAME, 'wb') as f:
f.write(original_data)
with open(RESUME_FILENAME, 'rb') as f:
self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f)
self.ftp.sendcmd('REST 5')
self.ftp.storbinary(f'STOR {RESUME_FILENAME}', BytesIO(resume_data))
downloaded = BytesIO()
self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write)
self.assertEqual(downloaded.getvalue(), expected_data)
def test_appe_append(self):
initial_data = b'12345'
append_data = b'67890'
expected_data = b'1234567890'
with open(RESUME_FILENAME, 'wb') as f:
f.write(initial_data)
with open(RESUME_FILENAME, 'rb') as f:
self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f)
self.ftp.storbinary(f'APPE {RESUME_FILENAME}', BytesIO(append_data))
downloaded = BytesIO()
self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write)
self.assertEqual(downloaded.getvalue(), expected_data)
def test_220_service_ready(self):
welcome = self.ftp.getwelcome()
self.assertTrue(welcome.startswith('220'), f"Expected 220 response, got: {welcome}")
def test_user_login_success(self):
resp = self.ftp.login(user=FTP_USER, passwd=FTP_PASS)
self.assertTrue(resp.startswith('230'), f"Login failed or unexpected response: {resp}")
def test_user_login_fail(self):
with self.assertRaises((error_perm, error_temp)):
self.ftp.login(user='wronguser', passwd='wrongpass')
def test_pwd_and_cwd(self):
pwd = self.ftp.pwd()
self.assertTrue(pwd.startswith('/'), f"PWD should return directory path, got: {pwd}")
resp = self.ftp.cwd('/')
self.assertTrue(resp.startswith('250'), f"CWD should succeed with 250 response, got: {resp}")
def Disabledtest_utf8_mkd(self):
"""Verify server accepts MKD command with UTF-8 directory names."""
test_dirs = [
"Café",
"测试",
"директория",
"データ",
"résumé"
]
with ftplib.FTP() as ftp:
ftp.connect(FTP_HOST, FTP_PORT, timeout=5)
ftp.login(FTP_USER, FTP_PASS)
for dir_name_utf8 in test_dirs:
with self.subTest(dir=dir_name_utf8):
print(f"C: MKD {dir_name_utf8}")
response = ftp.mkd(dir_name_utf8)
print(f"S: {response}")
self.assertTrue(response.startswith('257'), f"MKD failed for directory '{dir_name_utf8}'")
# Clean up after test
ftp.rmd(dir_name_utf8)
print(f"Removed directory: {dir_name_utf8}")
def test_pbsz_prot_violations(self):
HOST = '127.0.0.1'
PORT = 21
print("\n--- 12-15. Verifying Rules: PBSZ/PROT Command Sequences and Validation ---")
# Scenario 1: Invoke PBSZ without prior security exchange (plaintext)
print("\n[+] Scenario 1: Invoke PBSZ on plaintext connection")
with socket.create_connection((HOST, PORT), timeout=5) as sock:
banner = sock.recv(1024).decode()
print(f"S: {banner.strip()}")
sock.sendall(b'PBSZ 0\r\n')
print("C: PBSZ 0")
response = sock.recv(1024).decode().strip()
print(f"S: {response}")
self.assertNotEqual(response.startswith('200'), True, "Violation: PBSZ accepted without security.")
# Scenario 2 & 3: Use TLS connection for further checks
print("\n[+] Scenario 2: Invoke PBSZ with invalid parameters")
sock = socket.create_connection((HOST, PORT), timeout=5)
banner = sock.recv(1024).decode()
print(f"S: {banner.strip()}")
sock.sendall(b'AUTH TLS\r\n')
auth_response = sock.recv(1024).decode().strip()
print(f"C: AUTH TLS\nS: {auth_response}")
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
ssock = context.wrap_socket(sock, server_hostname=HOST)
try:
ssock.sendall(b'PBSZ not_a_number\r\n')
print("C: PBSZ not_a_number")
response = ssock.recv(1024).decode().strip()
print(f"S: {response}")
self.assertNotEqual(response.startswith('200'), True,
"Violation: Server accepted invalid PBSZ parameter.")
print("\n[+] Scenario 3: Invoke PROT on TLS connection without prior PBSZ")
ssock.sendall(b'PROT P\r\n')
print("C: PROT P")
response_prot = ssock.recv(1024).decode().strip()
print(f"S: {response_prot}")
self.assertNotEqual(response_prot.startswith('200'), True,
"Violation: Server accepted PROT without prior PBSZ.")
finally:
ssock.close()
def Disabtest_utf8_mkd_with_opts(self):
dir_name_utf8 = "rés"
try:
with ftplib.FTP() as ftp:
ftp.connect(FTP_HOST, FTP_PORT, timeout=5)
ftp.login(FTP_USER, FTP_PASS)
# Enable UTF-8 mode on the server
print("C: OPTS UTF8 ON")
response = ftp.sendcmd("OPTS UTF8 ON")
print(f"S: {response}")
if not response.startswith('200'):
print("Warning: OPTS UTF8 not supported or rejected by server.")
# Now try to create directory with UTF-8 name
print(f"C: MKD {dir_name_utf8}")
response = ftp.mkd(dir_name_utf8)
print(f"S: {response}")
print(f"Successfully created UTF-8 directory: {dir_name_utf8}")
except ftplib.error_perm as e:
print(f"FTP permission error creating directory '{dir_name_utf8}': {e}")
except Exception as e:
print(f"Error: {e}")
def test_feat_space_indent(self):
"""
Verify whether each feature in the FEAT response correctly starts with a single space.
"""
HOST = '127.0.0.1'
PORT = 21
USER = 'username'
PASS = 'password'
print("\n--- 8. Verifying Rule: Missing Space Before FEAT Features ---")
try:
sock = socket.create_connection((HOST, PORT), timeout=10)
welcome = sock.recv(1024).decode().strip()
print(f"S: {welcome}")
sock.sendall(f'USER {USER}\r\n'.encode())
resp = sock.recv(1024).decode().strip()
print(f"C: USER {USER}\nS: {resp}")
sock.sendall(f'PASS {PASS}\r\n'.encode())
resp = sock.recv(1024).decode().strip()
print(f"C: PASS {PASS}\nS: {resp}")
self.assertTrue(resp.startswith('230'), "Login failed")
sock.sendall(b'FEAT\r\n')
full_response = b''
while True:
data = sock.recv(1024)
if not data:
break
full_response += data
if b'211 End.' in full_response:
break
response_text = full_response.decode('utf-8', errors='ignore')
lines = response_text.split('\r\n')
feature_lines = []
in_features = False
for line in lines:
if line.startswith('211-'):
in_features = True
continue
elif line.startswith('211 '):
break
elif in_features and line.strip():
feature_lines.append(line)
self.assertTrue(len(feature_lines) > 0, "No feature lines found in FEAT response")
for line in feature_lines:
self.assertTrue(line.startswith(' '), f"Feature line does not start with a space: '{line}'")
except Exception as e:
self.fail(f"Exception occurred during FEAT space indent verification: {e}")
finally:
if 'sock' in locals():
try:
sock.close()
except:
pass
def test_pasv_retr(self):
self.ftp.login(FTP_USER, FTP_PASS)
self.ftp.set_pasv(True)
with open(DOWNLOAD_FILENAME, 'wb') as f:
self.ftp.retrbinary(f'RETR {TEST_FILENAME}', f.write)
self.assertTrue(os.path.exists(DOWNLOAD_FILENAME), "Downloaded file does not exist")
# Check downloaded file size is > 0
self.assertGreater(os.path.getsize(DOWNLOAD_FILENAME), 0, "Downloaded file is empty")
def test_active_retr(self):
self.ftp.set_pasv(False)
try:
with open(DOWNLOAD_FILENAME, 'wb') as f:
self.ftp.retrbinary(f'RETR {TEST_FILENAME}', f.write)
except EOFError:
self.skipTest("Active mode data connection failed")
self.assertTrue(os.path.exists(DOWNLOAD_FILENAME), "Downloaded file does not exist")
def test_rest_command(self):
with open(TEST_FILENAME, 'rb') as f:
f.seek(5)
expected_data = f.read()
self.ftp.sendcmd('REST 5')
data = bytearray()
self.ftp.retrbinary(f'RETR {TEST_FILENAME}', data.extend)
self.assertEqual(data, expected_data, f"REST transfer mismatch.\nExpected: {expected_data!r}\nActual : {data!r}")
def log_command(direction, command, response=None):
"""Log FTP commands and responses"""
if direction == "C":
print(f"C: {command}")
elif direction == "S":
print(f"S: {response}")
elif direction == "ERROR":
print(f"ERROR: {command}")
elif direction == "INFO":
print(f"INFO: {command}")
def test_rest_stor_resume_explicit(self):
"""
Test resuming an upload using REST + STOR.
This should append or overwrite starting from the specified offset.
"""
original_data = b'abcdefghij' # 10 bytes
resume_data = b'XYZ' # Will overwrite at offset 5 → expect: abcdeXYZij
expected_data = b'abcdeXYZij'
# Step 1: Upload original data
with open(RESUME_FILENAME, 'wb') as f:
f.write(original_data)
with open(RESUME_FILENAME, 'rb') as f:
self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f)
# Step 2: Resume upload from offset 5 with 'XYZ'
self.ftp.sendcmd('REST 5')
self.ftp.storbinary(f'STOR {RESUME_FILENAME}', BytesIO(resume_data))
# Step 3: Download and verify final file content
downloaded = BytesIO()
self.ftp.retrbinary(f'RETR {RESUME_FILENAME}', downloaded.write)
result = downloaded.getvalue()
self.assertEqual(result, expected_data,
f"REST+STOR resume failed.\nExpected: {expected_data}\nActual : {result}")
def test_rest_violations(self):
FILE = "test_rest.txt"
CONTENT = b"1234567890"
print("\n--- Verifying Rules: REST Command Sequence and Implementation ---")
print("Test Objective: Verify that the REST command correctly limits subsequent commands and handles offsets")
try:
with ftplib.FTP() as ftp:
ftp.set_debuglevel(0)
ftp.connect(FTP_HOST, FTP_PORT, timeout=5)
ftp.login(FTP_USER, FTP_PASS)
self.log_command("INFO", "Login successful")
self.log_command("C", "TYPE I")
resp = ftp.sendcmd("TYPE I")
self.log_command("S", resp)
self.log_command("C", f"STOR {FILE}")
ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT))
self.log_command("INFO", f"File uploaded successfully, content: {CONTENT} (length: {len(CONTENT)})")
print("\n" + "="*60)
print("[+] Scenario 1: Verify if RETR correctly handles the offset set by REST")
print(" RFC Requirement: RETR should start transferring from the byte position specified by REST")
rest_offset = 5
self.log_command("C", f"REST {rest_offset}")
response_rest2 = ftp.sendcmd(f"REST {rest_offset}")
self.log_command("S", response_rest2)
self.assertTrue(response_rest2.startswith('350'), f"REST {rest_offset} failed: {response_rest2}")
self.log_command("C", f"RETR {FILE}")
out = BytesIO()
ftp.retrbinary(f"RETR {FILE}", out.write)
retrieved_content = out.getvalue()
print(f"\n Transfer Result Analysis:")
print(f" Original File Content: {CONTENT} (length: {len(CONTENT)})")
print(f" REST Offset: {rest_offset} (starting from byte {rest_offset})")
print(f" Expected Transferred Content: {CONTENT[rest_offset:]} (length: {len(CONTENT[rest_offset:])})")
print(f" Actual Transferred Content: {retrieved_content} (length: {len(retrieved_content)})")
# Assertion based on REST behavior
if len(retrieved_content) == len(CONTENT):
self.fail("RETR ignored REST offset: entire file downloaded.")
elif len(retrieved_content) == len(CONTENT) - rest_offset and retrieved_content == CONTENT[rest_offset:]:
print("No violation detected. RETR correctly handled the REST offset.")
else:
self.fail("REST offset handled incorrectly or data mismatch.")
try:
self.log_command("C", f"DELE {FILE}")
# ftp.delete(FILE) # Uncomment if delete supported by server
self.log_command("INFO", "Test file cleanup completed")
except Exception:
self.log_command("ERROR", "Failed to cleanup test file")
except Exception as e:
self.fail(f"Error during REST violation test: {e}")
def test_rest_stor_resume(self):
with open(UPLOAD_FILENAME, 'wb') as f:
f.write(TEST_CONTENT)
with open(UPLOAD_FILENAME, 'rb') as f:
self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f)
self.ftp.sendcmd('REST 5')
with open(UPLOAD_FILENAME, 'rb') as f:
f.seek(5)
resp = self.ftp.storbinary(f'STOR {RESUME_FILENAME}', f)
self.assertTrue(resp.startswith('226') or resp.startswith('250'),
f"STOR after REST failed: {resp}")
def test_rest_cleared_after_retr(self):
self.ftp.sendcmd('REST 5')
self.ftp.retrbinary(f'RETR {TEST_FILENAME}', lambda _: None) # discard output
# Next RETR should start from 0
data = bytearray()
self.ftp.retrbinary(f'RETR {TEST_FILENAME}', data.extend)
with open(TEST_FILENAME, 'rb') as f:
expected = f.read()
self.assertEqual(data, expected, "REST offset wasn't cleared after first RETR")
def test_stor_upload(self):
with open(UPLOAD_FILENAME, 'wb') as f:
f.write(b'This is test upload content.')
with open(UPLOAD_FILENAME, 'rb') as f:
resp = self.ftp.storbinary(f'STOR {UPLOAD_FILENAME}', f)
self.assertTrue(resp.startswith('226') or resp.startswith('250'),
f"STOR command should succeed, got: {resp}")
def test_ccc_without_prereq(self):
"""Verify that the server rejects CCC command sent without an active TLS session."""
try:
with FTP() as ftp:
ftp.connect(FTP_HOST, FTP_PORT, timeout=5)
ftp.login(FTP_USER, FTP_PASS)
# Send CCC command in plaintext session
response = ftp.voidcmd('CCC')
# If the server responds with 200, that is a violation
self.assertFalse(response.startswith('200'),
f"Server incorrectly accepted CCC command without TLS: {response}")
except error_perm as e:
# Expected behavior: server rejects CCC in plaintext with 5xx error
self.assertTrue(str(e).startswith('5'), f"Unexpected permission error response: {e}")
except Exception as e:
self.fail(f"Unexpected exception during CCC command test: {e}")
def test_ccc_unprotected_reply(self):
"""
Verify if the server returns the correct 533 response code
when CCC is sent over an unprotected connection.
"""
try:
with FTP() as ftp:
ftp.connect(FTP_HOST, FTP_PORT, timeout=5)
ftp.login(FTP_USER, FTP_PASS)
response = ftp.voidcmd('CCC')
except error_perm as e:
response = str(e)
except Exception as e:
self.fail(f"Unexpected exception during CCC command test: {e}")
# The RFC requires the server to respond with 533 if CCC is disallowed in the current context
if '533' in response:
pass # Correct behavior
elif response.startswith('200') or response.startswith('502'):
self.fail(f"Violation: Server responded with '{response.split()[0]}' instead of '533'.")
else:
self.fail(f"Unexpected server response: {response}")
def test_rest_offset_compliance(self):
"""
Verify that RETR respects REST offset and doesn't transfer the whole file.
"""
self.ftp.login(FTP_USER, FTP_PASS)
self.ftp.sendcmd("TYPE I")
FILE = "test_rest.txt"
CONTENT = b"1234567890"
rest_offset = 5
# Upload known content
self.ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT))
# Set REST offset
resp = self.ftp.sendcmd(f"REST {rest_offset}")
self.assertTrue(resp.startswith('350'), f"Expected 350 response to REST, got: {resp}")
# RETR after REST
out = BytesIO()
self.ftp.retrbinary(f"RETR {FILE}", out.write)
retrieved_content = out.getvalue()
expected = CONTENT[rest_offset:]
# Validate
self.assertEqual(retrieved_content, expected,
f"REST offset not respected.\nExpected: {expected!r}\nActual : {retrieved_content!r}")
# Clean up (optional, since tearDown removes test files)
try:
self.ftp.delete(FILE)
except:
pass
def test_auth_reauth_violation(self):
"""
Verify whether the server enforces re-authentication after re-issuing the AUTH command.
"""
HOST = FTP_HOST
PORT = FTP_PORT
USER = FTP_USER
PASS = FTP_PASS
print("\n--- Verifying Rule: Re-authentication Required After Re-issuing AUTH Command ---")
sock = None
ssock = None
try:
# Step 1: Connect and log in in plaintext
sock = socket.create_connection((HOST, PORT), timeout=5)
welcome = sock.recv(1024).decode().strip()
print(f"S: {welcome}")
sock.sendall(f'USER {USER}\r\n'.encode())
print(f"C: USER {USER}")
response_user = sock.recv(1024).decode().strip()
print(f"S: {response_user}")
sock.sendall(f'PASS {PASS}\r\n'.encode())
print(f"C: PASS {'*' * len(PASS)}")
response_login = sock.recv(1024).decode().strip()
print(f"S: {response_login}")
self.assertTrue(response_login.startswith('230'), "Login failed; test cannot continue.")
# Step 2: Verify logged-in status
sock.sendall(b'PWD\r\n')
print("C: PWD")
response_pwd1 = sock.recv(1024).decode().strip()
print(f"S: {response_pwd1}")
self.assertTrue(response_pwd1.startswith('257'), "PWD command failed; cannot confirm login status.")
# Step 3: Re-issue AUTH TLS command to reset security state
sock.sendall(b'AUTH TLS\r\n')
print("C: AUTH TLS")
response_auth = sock.recv(1024).decode().strip()
print(f"S: {response_auth}")
self.assertTrue(response_auth.startswith('234'), f"AUTH TLS command failed: {response_auth}")
# Wrap socket in SSL context (skip certificate verification)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
ssock = context.wrap_socket(sock, server_hostname=HOST)
print(" -> TLS handshake successful; connection is encrypted.")
# Step 4: Attempt authenticated command again without re-authenticating
ssock.sendall(b'PWD\r\n')
print("C: PWD")
response_pwd2 = ssock.recv(1024).decode().strip()
print(f"S: {response_pwd2}")
# Analyze result
if response_pwd2.startswith('257'):
self.fail("VIOLATION: Server accepted PWD command after AUTH TLS without re-authentication!")
elif response_pwd2.startswith('530'):
# Correct behavior: require login after re-auth
pass
else:
self.fail(f"Unexpected server response after AUTH TLS: {response_pwd2}")
except Exception as e:
self.fail(f"Exception occurred during AUTH re-auth test: {e}")
finally:
if ssock:
ssock.close()
elif sock:
sock.close()
def test_quit(self):
resp = self.ftp.quit()
self.assertTrue(resp.startswith('221'), f"QUIT should respond with 221, got: {resp}")
def Disabledtest_invalid_command(self):
try:
resp = self.ftp.sendcmd('FOOBAR')
self.assertTrue(resp.startswith('500') or resp.startswith('502'),
f"Invalid command should respond with 5xx error, got: {resp}")
except error_perm as e:
self.assertTrue(str(e).startswith('500') or str(e).startswith('502'),
f"Invalid command error should start with 5xx, got: {e}")
if __name__ == '__main__':
unittest.main()

1
test/resume_test.txt Normal file
View File

@ -0,0 +1 @@
abcdefghij

45
test/test1.py Normal file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 09:09:55 2025
@author: ugo
"""
import ftplib
import socket
def verify_ccc_no_prereq_check():
"""python
Send CCC directly in an unencrypted session to verify if the server handles it incorrectly.
"""
HOST = '127.0.0.1'
PORT = 21
USER = 'username'
PASS = 'password'
print("\n--- 2. Verifying Rule: CCC Without Preceding Security Exchange Check ---")
try:
with ftplib.FTP() as ftp:
ftp.connect(HOST, PORT, timeout=5)
ftp.login(USER, PASS)
print("Successfully logged in in a plaintext session.")
print("C: CCC")
# A non-compliant server will return 200 OK, indicating it accepted the command
response = ftp.voidcmd('CCC')
print(f"S: {response}")
if response.startswith('200'):
print("\n[!] VIOLATION CONFIRMED: Server accepted CCC command without an active TLS session.")
else:
print("\n[-] No violation detected or server returned an unexpected response.")
except ftplib.error_perm as e:
print(f"[-] Server correctly rejected the command. No violation. Server response: {e}")
except Exception as e:
print(f"\nError occurred: {e}")
if __name__ == "__main__":
verify_ccc_no_prereq_check()

48
test/test2.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 09:20:47 2025
@author: ugo
"""
import ftplib
import socket
def verify_ccc_unprotected_reply():
"""
Verify if the server returns an incorrect response code when CCC is sent over an unprotected connection.
"""
HOST = '127.0.0.1'
PORT = 21
USER = 'username'
PASS = 'password'
print("\n--- 3. Verifying Rule: Response Code for Unprotected CCC ---")
try:
with ftplib.FTP() as ftp:
ftp.connect(HOST, PORT, timeout=5)
ftp.login(USER, PASS)
print("Successfully logged in in a plaintext session.")
print("C: CCC")
response = ftp.voidcmd('CCC')
print(f"S: {response}")
except ftplib.error_perm as e:
response = str(e)
print(f"S: {response}")
except Exception as e:
print(f"\nError occurred: {e}")
return
if '533' in response:
print("\n[-] No violation detected. Server correctly responded with 533.")
elif '200' in response or '502' in response:
print(f"\n[!] VIOLATION CONFIRMED: Server responded with '{response.split()[0]}' instead of the required '533'.")
else:
print("\n[?] Server returned an unexpected response.")
if __name__ == "__main__":
verify_ccc_unprotected_reply()

99
test/test3.py Normal file
View File

@ -0,0 +1,99 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 14:54:55 2025
@author: ugo
"""
import socket
import ssl
def verify_auth_reauth_violation():
"""
Verify whether the server enforces re-authentication after re-issuing the AUTH command.
"""
HOST = '127.0.0.1'
PORT = 21
USER = 'username'
PASS = 'password'
print("--- Verifying Rule: Re-authentication Required After Re-issuing AUTH Command ---")
try:
# Step 1: Connect and log in in plaintext
print("\n[+] Step 1: Establish plaintext connection and log in...")
sock = socket.create_connection((HOST, PORT), timeout=5)
print(f"S: {sock.recv(1024).decode().strip()}")
sock.sendall(f'USER {USER}\r\n'.encode())
print(f"C: USER {USER}")
print(f"S: {sock.recv(1024).decode().strip()}")
sock.sendall(f'PASS {PASS}\r\n'.encode())
print(f"C: PASS {PASS}")
response_login = sock.recv(1024).decode().strip()
print(f"S: {response_login}")
if not response_login.startswith('230'):
print("[-] Login failed; test cannot continue.")
return
# Step 2: Verify logged-in status
print("\n[+] Step 2: Execute PWD command in plaintext mode to verify login status...")
sock.sendall(b'PWD\r\n')
print("C: PWD")
response_pwd1 = sock.recv(1024).decode().strip()
print(f"S: {response_pwd1}")
if not response_pwd1.startswith('257'):
print("[-] PWD command failed; cannot confirm login status.")
return
print(" -> Status confirmed: User is logged in.")
# Step 3: Re-issue AUTH command to reset security state
print("\n[+] Step 3: Re-issue AUTH TLS command to upgrade connection to encrypted mode...")
sock.sendall(b'AUTH TLS\r\n')
print("C: AUTH TLS")
response_auth = sock.recv(1024).decode().strip()
print(f"S: {response_auth}")
if not response_auth.startswith('234'):
print(f"[-] AUTH TLS command failed: {response_auth}")
return
# Create SSL context that does not verify certificates
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Wrap regular socket into SSL socket
ssock = context.wrap_socket(sock, server_hostname=HOST)
print(" -> TLS handshake successful; connection is encrypted.")
# Step 4: Attempt authenticated command again without re-authenticating
print("\n[+] Step 4: In encrypted mode, execute PWD command directly without re-logging in...")
ssock.sendall(b'PWD\r\n')
print("C: PWD")
response_pwd2 = ssock.recv(1024).decode().strip()
print(f"S: {response_pwd2}")
# Final analysis
print("\n--- Final Analysis ---")
if response_pwd2.startswith('257'):
print("[!] VIOLATION CONFIRMED: Server accepted the command, proving user login state was retained!")
print("[!] This is a high-risk authentication bypass vulnerability.")
elif response_pwd2.startswith('530'):
print("[-] NO VIOLATION: Server correctly rejected the command, requiring re-authentication.")
else:
print(f"[?] UNEXPECTED RESPONSE: Server returned an unexpected response code: {response_pwd2}")
except Exception as e:
print(f"\nError occurred: {e}")
finally:
if 'ssock' in locals():
ssock.close()
elif 'sock' in locals():
sock.close()
if __name__ == "__main__":
verify_auth_reauth_violation()

116
test/test4.py Normal file
View File

@ -0,0 +1,116 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 15:07:22 2025
@author: ugo
"""
import socket
def verify_feat_space_indent():
"""
Verify whether each feature in the FEAT response correctly starts with a single space.
"""
HOST = '127.0.0.1'
PORT = 21
USER = 'username'
PASS = 'password'
print("\n--- 8. Verifying Rule: Missing Space Before FEAT Features ---")
try:
# Use raw socket connection for better reliability
sock = socket.create_connection((HOST, PORT), timeout=10)
# Read welcome message
welcome = sock.recv(1024).decode().strip()
print(f"S: {welcome}")
# Login
sock.sendall(b'USER username\r\n')
resp = sock.recv(1024).decode().strip()
print(f"C: USER username\nS: {resp}")
sock.sendall(b'PASS password\r\n')
resp = sock.recv(1024).decode().strip()
print(f"C: PASS password\nS: {resp}")
if not resp.startswith('230'):
print("Login failed")
return
# Send FEAT command
print("C: FEAT")
sock.sendall(b'FEAT\r\n')
# Read complete FEAT response
full_response = b''
while True:
data = sock.recv(1024)
if not data:
break
full_response += data
# Check for end marker "211 End."
if b'211 End.' in full_response:
break
# Parse response
response_text = full_response.decode('utf-8', errors='ignore')
lines = response_text.split('\r\n')
print("S: Full response:")
for line in lines:
if line.strip(): # Print only non-empty lines
print(f" {line}")
# Find feature lines (between 211- and 211 End.)
feature_lines = []
in_features = False
for line in lines:
if line.startswith('211-'):
in_features = True
continue
elif line.startswith('211 '):
break
elif in_features and line.strip():
feature_lines.append(line)
print(f"\nParsed feature lines ({len(feature_lines)} lines):")
for line in feature_lines:
print(f" '{line}'")
# Check for violations
violation_found = False
if not feature_lines:
print("No feature lines found.")
return
for line in feature_lines:
# Per RFC 2389, each feature line must start with a single space
if not line.startswith(' '):
print(f"\n[!] VIOLATION CONFIRMED: Feature line '{line.strip()}' is missing the required leading space.")
print(f" Original line: '{line}' (length: {len(line)})")
violation_found = True
else:
print(f"[-] Correct format: '{line.strip()}' (starts with space)")
if not violation_found:
print("\n[-] No violations detected. All feature lines are correctly formatted.")
else:
print(f"\n[!] RFC 2389 violation found: Feature lines must start with a space!")
except Exception as e:
print(f"\nError occurred: {e}")
import traceback
traceback.print_exc()
finally:
if 'sock' in locals():
try:
sock.close()
except:
pass
if __name__ == "__main__":
verify_feat_space_indent()

48
test/test5.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 15:44:40 2025
@author: ugo
"""
import ftplib
def test_utf8_mkd(server, port, user, password, dir_name_utf8):
try:
with ftplib.FTP() as ftp:
ftp.connect(server, port, timeout=5)
ftp.login(user, password)
print(f"C: MKD {dir_name_utf8}")
response = ftp.mkd(dir_name_utf8)
print(f"S: {response}")
# Clean up after test
ftp.rmd(dir_name_utf8)
print(f"Removed directory: {dir_name_utf8}")
except ftplib.error_perm as e:
print(f"FTP permission error: {e}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
# Example UTF-8 directory names to test:
test_dirs = [
"Café",
"测试",
"директория",
"データ",
"résumé"
]
# Update with your FTP server info:
FTP_SERVER = "127.0.0.1"
FTP_PORT = 21
FTP_USER = "username"
FTP_PASS = "password"
for d in test_dirs:
print("\n--- Testing MKD with UTF-8 directory ---")
test_utf8_mkd(FTP_SERVER, FTP_PORT, FTP_USER, FTP_PASS, d)

39
test/test6.py Normal file
View File

@ -0,0 +1,39 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 15:52:21 2025
@author: ugo
"""
import ftplib
def test_opts_utf8(server, port, user, password):
try:
with ftplib.FTP() as ftp:
ftp.connect(server, port, timeout=5)
ftp.login(user, password)
# Send OPTS UTF8 ON command
print("C: OPTS UTF8 ON")
response = ftp.sendcmd("OPTS UTF8 ON")
print(f"S: {response}")
# Check if server responded with 200
if response.startswith('200'):
print("OPTS UTF8 command is supported and accepted.")
else:
print("OPTS UTF8 command is not supported or rejected.")
except ftplib.error_perm as e:
print(f"FTP permission error: {e}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
FTP_SERVER = "127.0.0.1"
FTP_PORT = 21
FTP_USER = "username"
FTP_PASS = "password"
test_opts_utf8(FTP_SERVER, FTP_PORT, FTP_USER, FTP_PASS)

81
test/test7.py Normal file
View File

@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 16:28:50 2025
@author: ugo
"""
import socket
import ssl
def verify_pbsz_prot_violations():
"""
Combined verification of multiple sequence and validation violations related to PBSZ and PROT.
"""
HOST = '127.0.0.1'
PORT = 21
USER = 'username'
PASS = 'password'
print("\n--- 12-15. Verifying Rules: PBSZ/PROT Command Sequences and Validation ---")
# Scenario 1: Invoke PBSZ without prior security exchange
print("\n[+] Scenario 1: Invoke PBSZ on plaintext connection")
try:
with socket.create_connection((HOST, PORT), timeout=5) as sock:
sock.recv(1024)
sock.sendall(b'PBSZ 0\r\n')
print("C: PBSZ 0")
response = sock.recv(1024).decode().strip()
print(f"S: {response}")
if response.startswith('200'):
print(" -> VIOLATION CONFIRMED: Server accepted PBSZ without prior security exchange.")
else:
print(" -> No violation detected for this case.")
except Exception as e:
print(f" -> Error occurred: {e}")
# Scenario 2: Invoke PBSZ with invalid parameters
print("\n[+] Scenario 2: Invoke PBSZ with invalid parameters")
try:
# Need a TLS connection to proceed
sock = socket.create_connection((HOST, PORT), timeout=5)
sock.recv(1024)
sock.sendall(b'AUTH TLS\r\n')
sock.recv(1024)
# Create SSL context that doesn't verify certificates
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
ssock = context.wrap_socket(sock, server_hostname=HOST)
ssock.sendall(b'PBSZ not_a_number\r\n')
print("C: PBSZ not_a_number")
response = ssock.recv(1024).decode().strip()
print(f"S: {response}")
if response.startswith('200'):
print(" -> VIOLATION CONFIRMED: Server responded with 200 OK to invalid PBSZ parameter instead of 501.")
else:
print(" -> No violation detected for this case.")
# Scenario 3: Invoke PROT without calling PBSZ first
print("\n[+] Scenario 3: Invoke PROT on TLS connection without prior PBSZ")
ssock.sendall(b'PROT P\r\n')
print("C: PROT P")
response_prot = ssock.recv(1024).decode().strip()
print(f"S: {response_prot}")
if response_prot.startswith('200'):
print(" -> VIOLATION CONFIRMED: Server accepted PROT without prior PBSZ.")
else:
print(" -> No violation detected for this case.")
except Exception as e:
print(f"\nError occurred: {e}")
finally:
if 'ssock' in locals():
ssock.close()
if __name__ == "__main__":
verify_pbsz_prot_violations()

139
test/test8.py Normal file
View File

@ -0,0 +1,139 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jul 19 18:15:07 2025
@author: ugo
"""
import socket
import ssl
import ftplib
from io import BytesIO
def log_command(direction, command, response=None):
"""Log FTP commands and responses"""
if direction == "C":
print(f"C: {command}")
elif direction == "S":
print(f"S: {response}")
elif direction == "ERROR":
print(f"ERROR: {command}")
elif direction == "INFO":
print(f"INFO: {command}")
def verify_rest_violations():
"""
Verify REST-related sequence and implementation violations.
"""
HOST = '127.0.0.1'
PORT = 21
USER = 'username'
PASS = 'password'
FILE = "test_rest.txt"
CONTENT = b"1234567890"
print("\n--- Verifying Rules: REST Command Sequence and Implementation ---")
print("Test Objective: Verify that the REST command correctly limits subsequent commands and handles offsets")
try:
with ftplib.FTP() as ftp:
# Enable debug mode to show all FTP interactions
ftp.set_debuglevel(0) # We handle display ourselves
log_command("INFO", f"Connecting to {HOST}:{PORT}")
ftp.connect(HOST, PORT, timeout=5)
log_command("C", f"USER {USER}")
ftp.login(USER, PASS)
log_command("INFO", "Login successful")
log_command("C", "TYPE I")
resp = ftp.sendcmd("TYPE I")
log_command("S", resp)
# Upload test file
log_command("C", f"STOR {FILE}")
ftp.storbinary(f"STOR {FILE}", BytesIO(CONTENT))
log_command("INFO", f"File uploaded successfully, content: {CONTENT} (length: {len(CONTENT)})")
print("\n" + "="*60)
# Scenario 1: Verify if RETR correctly handles the offset set by REST
print("[+] Scenario 1: Verify if RETR correctly handles the offset set by REST")
print(" RFC Requirement: RETR should start transferring from the byte position specified by REST")
# Reset REST point
rest_offset = 5
log_command("C", f"REST {rest_offset}")
try:
response_rest2 = ftp.sendcmd(f"REST {rest_offset}")
log_command("S", response_rest2)
if not response_rest2.startswith('350'):
print(f" REST {rest_offset} failed: {response_rest2}")
print(f" Expected: 350 Restarting at {rest_offset}")
return
else:
print(f" REST set offset to byte {rest_offset}")
except ftplib.error_perm as e:
log_command("ERROR", f"REST {rest_offset} failed: {e}")
return
# Download file
log_command("C", f"RETR {FILE}")
out = BytesIO()
ftp.retrbinary(f"RETR {FILE}", out.write)
retrieved_content = out.getvalue()
# Detailed analysis of results
print(f"\n Transfer Result Analysis:")
print(f" Original File Content: {CONTENT} (length: {len(CONTENT)})")
print(f" REST Offset: {rest_offset} (starting from byte {rest_offset})")
print(f" Expected Transferred Content: {CONTENT[rest_offset:]} (length: {len(CONTENT[rest_offset:])})")
print(f" Actual Transferred Content: {retrieved_content} (length: {len(retrieved_content)})")
if len(retrieved_content) == len(CONTENT):
print("\n VIOLATION CONFIRMED: RETR implementation ignored the offset set by REST")
print(" Actual Result: Downloaded the entire file, ignoring the REST offset")
print(f" Expected Result: Should only download {CONTENT[rest_offset:]} ({len(CONTENT[rest_offset:])} bytes)")
print(" Violation Explanation: Violates RFC 959 regarding the REST command")
elif len(retrieved_content) == len(CONTENT) - rest_offset:
if retrieved_content == CONTENT[rest_offset:]:
print(" No violation detected. RETR correctly handled the REST offset.")
else:
print(" VIOLATION CONFIRMED: Content mismatch, REST offset handled incorrectly")
print(f" Actual Content: {retrieved_content}")
print(f" Expected Content: {CONTENT[rest_offset:]}")
print(" Violation Explanation: Incorrect REST offset calculation")
elif len(retrieved_content) > len(CONTENT):
print(" VIOLATION CONFIRMED: Retrieved data length exceeds original file size")
print(f" Actual Length: {len(retrieved_content)} bytes")
print(f" Expected Maximum Length: {len(CONTENT)} bytes")
print(f" Expected Length after REST: {len(CONTENT[rest_offset:])} bytes")
print(" Violation Explanation: Severe data transfer anomaly")
else:
print(f" VIOLATION CONFIRMED: Unexpected retrieved content length")
print(f" Actual Length: {len(retrieved_content)} bytes")
print(f" Expected Length: {len(CONTENT[rest_offset:])} bytes")
print(" Violation Explanation: Problem with REST command implementation")
# Clean up test file
try:
log_command("C", f"DELE {FILE}")
#ftp.delete(FILE)
log_command("INFO", "Test file cleanup completed")
except:
log_command("ERROR", "Failed to cleanup test file")
except Exception as e:
print(f"\nError occurred: {e}")
import traceback
print("Detailed error information:")
traceback.print_exc()
print("\nPossible causes:")
print(" - Server connection issues")
print(" - Insufficient permissions")
print(" - Server does not support certain commands")
if __name__ == "__main__":
verify_rest_violations()