pythonftptwistedftps

Using Twisted to implement implicit FTPS server


I am writing an FTP server using the Python framework Twisted. Twisted has its own plain FTP implementation - but it doesn't support FTPS. I've noticed that most clients connect and immediately issue an AUTH TLS command, requesting an encrypted FTPS connection. If the server responds that this command is not supported, they just disconnect.

There are third-party libraries that implement implicit FTPS server (i.e., the client connects via FTPS right off the bat) like this one - but this is not what I need. I need explicit FTPS support - i.e., to switch to a TLS connection from within an FTP connection when the AUTH TLS command is received.

Any ideas how to do this?

P.S. Edited to correctly use explicit/implicit.


Solution

  • OK, I have confused the FTPS types; I actually need explicit FTPS (i.e., the handling of the AUTH TLS command); thanks to Martin Prikryl for pointing this out.

    Meanwhile, I figured out how to solve my problem. Please note that for FTPS you need a certificate (assumed to be stored in the file server.pem in the code below) - and, specifically, if you're going to use it with Twisted, it has to include the SAN extensions. For information how to create a self-signed certificate that contains SAN extensions, see this guide (backup link).

    Here is the basic code that implements what I want:

    
    from os.path import isfile
    from sys import version_info
    
    from twisted.internet.ssl import PrivateCertificate
    from twisted.internet.protocol import ServerFactory
    from twisted.protocols.basic import LineReceiver
    from twisted.internet.reactor import listenTCP, run
    from twisted.python.log import msg
    
    
    if version_info[0] >= 3:
        def decode(x):
            return x.decode('utf-8', errors='ignore')
        def encode(x):
            return x.encode()
    else:
        def decode(x):
            return x
        def encode(x):
            return x
    
    
    class MyFTPServer(LineReceiver):
        def __init__(self, options):
            self.options = options
            self.is_fttps = False
    
        def connectionMade(self):
            self.transport.write(b'220 MyFTP server\r\n')
    
        def lineReceived(self, line):
            line = decode(line)
            parts = line.split(None, 1)
            if parts:
                command = parts[0].upper()
                args = parts[1] if len(parts) > 1 else ''
                self.process_command(command, args)
    
        def connectionLost(self, reason):
            self.is_fttps = False
    
        def process_command(self, command, args):
            if command == 'AUTH':
                if len(args) == 0:
                    self.transport.write(b'504 AUTH requires at least one argument\r\n')
                elif args.upper().strip() not in ['TLS', 'TLS-C', 'SSL', 'TLS-P']:
                    self.transport.write(b'500 AUTH not understood\r\n')
                elif self.is_fttps:
                    self.transport.write(b'200 User is already authenticated.\r\n')
                elif self.options is not None:
                    self.transport.write(b'234 AUTH TLS successful\r\n')
                    self.transport.startTLS(self.options)
                    self.is_fttps = True
                else:
                    self.transport.write(b'500 AUTH not understood\r\n')
            elif command == 'CCC':
                if not self.is_fttps:
                    self.transport.write(b'533 Command channel is alredy cleared\r\n')
                else:
                    self.transport.write(b'200 Clear Command Channel OK\r\n')
                    self.transport.stopTLS()
                    self.is_fttps = False
            # elif command == '...':
            #     # Process other commands
            else:
                self.transport.write(b'500 ' + encode(command) + b' not understood\r\n')
    
    
    class MyFTPFactory(ServerFactory):
        def __init__(self, certfile):
            factory_options = None
            if isfile(certfile):
                cert_data = ''
                try:
                    with open(certfile) as f:
                        cert_data += f.read()
                except OSError:
                    msg('Could not read the file "{}".'.format(certfile))
                if cert_data:
                    factory_options = PrivateCertificate.loadPEM(cert_data).options()
            self.options = factory_options
    
        def buildProtocol(self, addr):
            return MyFTPServer(self.options)
    
    
    def main():
        certfile = 'server.pem'
        listenTCP(21, MyFTPFactory(certfile))
        run()
        msg('Shutdown requested, exiting...')
    
    
    if __name__ == '__main__':
        main()