I am writing an FTP server using the Python framework Twisted. Twisted has its own plain FTP implementation - but it doesn't support FTPS. I've noticed that most clients connect and immediately issue an AUTH TLS
command, requesting an encrypted FTPS connection. If the server responds that this command is not supported, they just disconnect.
There are third-party libraries that implement implicit FTPS server (i.e., the client connects via FTPS right off the bat) like this one - but this is not what I need. I need explicit FTPS support - i.e., to switch to a TLS connection from within an FTP connection when the AUTH TLS
command is received.
Any ideas how to do this?
P.S. Edited to correctly use explicit/implicit.
OK, I have confused the FTPS types; I actually need explicit FTPS (i.e., the handling of the AUTH TLS
command); thanks to Martin Prikryl for pointing this out.
Meanwhile, I figured out how to solve my problem. Please note that for FTPS you need a certificate (assumed to be stored in the file server.pem
in the code below) - and, specifically, if you're going to use it with Twisted, it has to include the SAN extensions. For information how to create a self-signed certificate that contains SAN extensions, see this guide (backup link).
Here is the basic code that implements what I want:
from os.path import isfile
from sys import version_info
from twisted.internet.ssl import PrivateCertificate
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet.reactor import listenTCP, run
from twisted.python.log import msg
if version_info[0] >= 3:
def decode(x):
return x.decode('utf-8', errors='ignore')
def encode(x):
return x.encode()
else:
def decode(x):
return x
def encode(x):
return x
class MyFTPServer(LineReceiver):
def __init__(self, options):
self.options = options
self.is_fttps = False
def connectionMade(self):
self.transport.write(b'220 MyFTP server\r\n')
def lineReceived(self, line):
line = decode(line)
parts = line.split(None, 1)
if parts:
command = parts[0].upper()
args = parts[1] if len(parts) > 1 else ''
self.process_command(command, args)
def connectionLost(self, reason):
self.is_fttps = False
def process_command(self, command, args):
if command == 'AUTH':
if len(args) == 0:
self.transport.write(b'504 AUTH requires at least one argument\r\n')
elif args.upper().strip() not in ['TLS', 'TLS-C', 'SSL', 'TLS-P']:
self.transport.write(b'500 AUTH not understood\r\n')
elif self.is_fttps:
self.transport.write(b'200 User is already authenticated.\r\n')
elif self.options is not None:
self.transport.write(b'234 AUTH TLS successful\r\n')
self.transport.startTLS(self.options)
self.is_fttps = True
else:
self.transport.write(b'500 AUTH not understood\r\n')
elif command == 'CCC':
if not self.is_fttps:
self.transport.write(b'533 Command channel is alredy cleared\r\n')
else:
self.transport.write(b'200 Clear Command Channel OK\r\n')
self.transport.stopTLS()
self.is_fttps = False
# elif command == '...':
# # Process other commands
else:
self.transport.write(b'500 ' + encode(command) + b' not understood\r\n')
class MyFTPFactory(ServerFactory):
def __init__(self, certfile):
factory_options = None
if isfile(certfile):
cert_data = ''
try:
with open(certfile) as f:
cert_data += f.read()
except OSError:
msg('Could not read the file "{}".'.format(certfile))
if cert_data:
factory_options = PrivateCertificate.loadPEM(cert_data).options()
self.options = factory_options
def buildProtocol(self, addr):
return MyFTPServer(self.options)
def main():
certfile = 'server.pem'
listenTCP(21, MyFTPFactory(certfile))
run()
msg('Shutdown requested, exiting...')
if __name__ == '__main__':
main()