I'm using a service whose API uses threading.Thread and I want to use Deferreds inside it.
If I run it like a standard python module I don't have any problem. Something like:
from twisted.internet import reactor
from outside import ServiceUsingThreadingAndDeferred
service = ServiceUsingThreadingAndDeferred()
reactor.listenTCP(port, protocol_factory)
service.start()
reactor.run()
While if I run the following .tac with twistd -y the service is simply not working:
from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
service = ServiceUsingThreadingAndDeferred()
# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)
service.start()
I think the problem in the second case is that the main reactor thread is not spawning the service threads itself, but then I don't see why (and if) it happens in the first case... I used callLater as a workaround -successfully:
from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
from twisted.internet import reactor
service = ServiceUsingThreadingAndDeferred()
# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)
reactor.callLater(1, service.start)
but I don't know if it's the correct way to tackle this. Do you have any advice?
From the Github repo, this class misuses Twisted's threading API:
class ServiceUsingThreadingAndDeferred():
def __init__(self):
pass
def start(self):
print "3rd party API service starting..."
self.run_as_thread()
def run_as_thread(self, *args, **kwargs):
t = threading.Thread(target=self.run_forever, args=args, kwargs=kwargs)
t.daemon = True
t.start()
def run_forever(self):
while 1:
print "Doing something remote..."
time.sleep(1)
now = time.time()
if 1 > now % 5 >= 0:
self.defer_activity()
def defer_activity(self):
threads.deferToThread(self._activity)
ServiceUsingThreadingAndDeferred.run_forever
runs in a non-reactor thread. It calls defer_activity
which calls threads.deferToThread
. It is not allowed to call threads.deferToThread
in a non-reactor thread. There is approximately one Twisted API that it is safe to call in a non-reactor thread: reactor.callFromThread
(it schedules a call to its argument to run in the reactor thread).
working.tac
does the same thing but gets lucky and so appears to work on some versions of Twisted. It is relying on undefined behavior that results from calling threads.deferToThread
in a non-reactor thread interacting with how callLater
is implemented. There's no guarantee it works completely or that the behavior is portable across Twisted versions or platforms.
If you want to use the reactor's threadpool from a non-reactor thread, you need to write something like:
from twisted.internet.threads import (
blockingCallFromThread,
deferToThread,
)
d = blockingCallFromThread(reactor, lambda: deferToThread(self._activity))
However, you may not use any methods of d
(the Deferred
returned by deferToThread
) in a non-reactor thread, either.
Most likely, if at all possible, you should rewrite the logic of ServiceUsingThreadingAndDeferred
so that it is compatible with the reactor and then you can avoid all of these shennanigans.