twistedpython-multithreadingtwistd

Correct use of API with threading in daemonized twisted


I'm using a service whose API uses threading.Thread and I want to use Deferreds inside it.

If I run it like a standard python module I don't have any problem. Something like:

from twisted.internet import reactor
from outside import ServiceUsingThreadingAndDeferred

service = ServiceUsingThreadingAndDeferred()

reactor.listenTCP(port, protocol_factory)

service.start()

reactor.run()

While if I run the following .tac with twistd -y the service is simply not working:

from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred

service = ServiceUsingThreadingAndDeferred()

# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)

service.start()

I think the problem in the second case is that the main reactor thread is not spawning the service threads itself, but then I don't see why (and if) it happens in the first case... I used callLater as a workaround -successfully:

from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
from twisted.internet import reactor

service = ServiceUsingThreadingAndDeferred()

# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)

reactor.callLater(1, service.start)

but I don't know if it's the correct way to tackle this. Do you have any advice?


Solution

  • From the Github repo, this class misuses Twisted's threading API:

    class ServiceUsingThreadingAndDeferred():
        def __init__(self):
            pass
    
        def start(self):
            print "3rd party API service starting..."
            self.run_as_thread()
    
        def run_as_thread(self, *args, **kwargs):
            t = threading.Thread(target=self.run_forever, args=args, kwargs=kwargs)
            t.daemon = True
            t.start()
    
        def run_forever(self):
            while 1:
                print "Doing something remote..."
                time.sleep(1)
                now = time.time()
                if 1 > now % 5 >= 0:
                    self.defer_activity()
    
        def defer_activity(self):
            threads.deferToThread(self._activity)
    

    ServiceUsingThreadingAndDeferred.run_forever runs in a non-reactor thread. It calls defer_activity which calls threads.deferToThread. It is not allowed to call threads.deferToThread in a non-reactor thread. There is approximately one Twisted API that it is safe to call in a non-reactor thread: reactor.callFromThread (it schedules a call to its argument to run in the reactor thread).

    working.tac does the same thing but gets lucky and so appears to work on some versions of Twisted. It is relying on undefined behavior that results from calling threads.deferToThread in a non-reactor thread interacting with how callLater is implemented. There's no guarantee it works completely or that the behavior is portable across Twisted versions or platforms.

    If you want to use the reactor's threadpool from a non-reactor thread, you need to write something like:

    from twisted.internet.threads import (
        blockingCallFromThread,
        deferToThread,
    )
    
    d = blockingCallFromThread(reactor, lambda: deferToThread(self._activity))
    

    However, you may not use any methods of d (the Deferred returned by deferToThread) in a non-reactor thread, either.

    Most likely, if at all possible, you should rewrite the logic of ServiceUsingThreadingAndDeferred so that it is compatible with the reactor and then you can avoid all of these shennanigans.