pythontwistedtwisted.webtwisted.internet

Unhandled error in Deferred in twisted deferToThread


I have client/server code example from twisted. Now, my requirement is that when a call is made to server from the client - the server defers the call to a thread, who actually replies back to the client and the server can do something else. In simple words, let's say client C1 makes a call to server S1 with a template Temp1. Server defers it to a thread T1. T1 now has to process function A, B, and C and finally get back to client C1. Below is my code for the server.

I am new to twisted and I am getting the error: Unhandled error in Deferred:

from twisted.internet import reactor, protocol, threads

def foo():
    time.sleep(5)
    print('Hello how are you!!!!')

def print_result():
    print('Processing done!!')

def onError():
    print('Error!!!!')

class Echo(protocol.Protocol):
    """This is just about the simplest possible protocol"""
    def process_func(self, data):
        print('hello i am in process_func!!!')
        self.transport.write(data)
        return foo()

    def onErrorfunc(self):
        onError()

    def onProcessDone(self):
        print_result()

    def dataReceived(self, data):
        "As soon as any data is received, write it back."
        # thr = threading.Thread(target=foo, args=(), kwargs={})
        # thr.start()
        d = threads.deferToThread(self.process_func, *data)
        d.addCallback(self.onProcessDone)
        d.addErrback(self.onErrorfunc)
        # do something else here
        # self.transport.write(data)

def main():
    """This runs the protocol on port 8000"""
    factory = protocol.ServerFactory()
    factory.protocol = Echo
    reactor.listenTCP(8000,factory)
    reactor.run()

# this only runs if the module was *not* imported
if __name__ == '__main__':
    main()

Why twisted, because the client/server is written in twisted already and I am doing minor changes. Help is appreciated. Thanks!


Solution

  • It's hard to tell where your Unhandled error in Deferred stems from since your example is overrun with syntax errors but I'll try to use my intuition and rewrite what you were trying to do :). I've made some comments so take a look and see how your code and this code differs.

    import time
    from twisted.internet import reactor, protocol, threads
    
    def foo():
        # this function didn't return anything in your example
        # now it returns a string
        time.sleep(5)
        return 'Hello how are you!!!!'
    
    class Echo(protocol.Protocol):
        def process_func(self, data):
            # data is unused here, typically you would "do something" to data in a thread
            # remember data is a bytes type not string!
            print('hello i am in process_func!!!')
            return foo()
    
        def onErrorfunc(self, failure):
            print('Error: {0}'.format(failure.value))
    
        def onProcessDone(self, result):
            # result is the string returned from process_func()
            # if Python version >= 3 then transport.write arg must be bytes
            self.transport.write(result.encode('utf8'))
            print('Processing done!!')
    
        def dataReceived(self, data):
            d = threads.deferToThread(self.process_func, data)
            d.addCallback(self.onProcessDone)
            d.addErrback(self.onErrorfunc)
    

    Try not to use self.transport.write() in a thread since it's scheduled using Twisted reactor. Instead run it in callbacks after your computations in the thread is complete. Threads should only be used for intensive computations because Twisted gives you tons of options to run code efficiently in a single thread.