When I trigger some code using reactor.callInThread
, it seems like the reactor does not fire until some other scheduled event occurs (an auto-ping in this case).
Depending on when the auto-ping lines up with my subscribe request I can see response times between 0 and 5 seconds. Manually editing autobahn/twisted/wamp.py
to change transport_factory.setProtocolOptions( ..., autoPingInterval=10., ...)
to something sub-second provides a new upper bound.
I ran tcpdump and the below code (simultaneously in the same terminal - indented tcpdump output for clarity) and got output like the following. Notice that tcpdump indicates that all the messages are sent ~simultaneously, and that neither the server nor client delays responses (it is just the first client send that is quite far from the initial trigger / log message):
2017-10-30T16:26:44-0700 Auto ping/pong: sending ping auto-ping/pong
2017-10-30T16:26:44-0700 Expecting ping in 5.0 seconds for auto-ping/pong
2017-10-30T16:26:44-0700 Auto ping/pong: received pending pong for auto-ping/pong
2017-10-30T16:26:44-0700 WebSocketProtocol.onPong(payload=<4 bytes>)
16:26:44.000880 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2772:2811, ack 8850, win 4096, options [nop,nop,TS val 826034469 ecr 12606734], length 39
16:26:44.004235 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8850:8885, ack 2811, win 285, options [nop,nop,TS val 12613555 ecr 826034469], length 35
16:26:44.004282 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8885, win 4094, options [nop,nop,TS val 826034472 ecr 12613555], length 0
2017-10-30T16:26:44-0700 WAMP SEND: message=Subscribe(XXX)
<-------------------->
Five Seconds Pass
<-------------------->
2017-10-30T16:26:49-0700 WAMP RECV: message=Subscribed(XXX)
2017-10-30T16:26:49-0700 WAMP SEND: message=Call(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Result(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700
16:26:49.000617 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2811:2884, ack 8885, win 4096, options [nop,nop,TS val 826039448 ecr 12613555], length 73
16:26:49.004748 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8885:8939, ack 2884, win 285, options [nop,nop,TS val 12618555 ecr 826039448], length 54
16:26:49.004797 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8939, win 4094, options [nop,nop,TS val 826039452 ecr 12618555], length 0
16:26:49.006799 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2884:3537, ack 8939, win 4096, options [nop,nop,TS val 826039454 ecr 12618555], length 653
16:26:49.009960 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8939:9000, ack 3537, win 299, options [nop,nop,TS val 12618561 ecr 826039454], length 61
16:26:49.010004 IP CLIENT.58323 > SERVER.443: Flags [.], ack 9000, win 4094, options [nop,nop,TS val 826039457 ecr 12618561], length 0
16:26:49.171613 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 9000:10329, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1329
16:26:49.171616 IP SERVER.443 > CLIENT.58323: Flags [.], seq 10329:11777, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1448
16:26:49.171618 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 11777:11857, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 80
16:26:49.171663 IP CLIENT.58323 > SERVER.443: Flags [.], ack 10329, win 4054, options [nop,nop,TS val 826039617 ecr 12618723], length 0
16:26:49.171678 IP CLIENT.58323 > SERVER.443: Flags [.], ack 11857, win 4006, options [nop,nop,TS val 826039617 ecr 12618723], length 0
2017-10-30T16:26:50-0700 Result: XXX (4.99s)
2017-10-30T16:26:50-0700
2017-10-30T16:26:50-0700 Sleeping
The below example reproduces the issue against my local server - heavily cut down from the original code but still with a few notable quirks: * Using SSL (needed, happy to be informed of better technique) * In a thread (~needed, using in spawned processes in a library for interaction) * Uses config.extra for state sharing (happy to be informed of better technique)
Consider the values in caps to be redacted but irrelevant.
import autobahn
assert autobahn.__version__ == '17.6.2'
class WampSession(ApplicationSession):
@inlineCallbacks
def do_work(self, args):
self.config.extra['joined'].wait()
yield self.subscribe(self._get_onResult(), args['resultTopic'])
args['authentication'] = self.config.extra['authentication']
yield self.call(FUNC, **args)
def _get_onResult(self):
def onResult(**kw):
if kw['status'] == 'done':
self.config.extra['result_queue'].put(kw['requestID'])
return onResult
def onConnect(self):
self.join(REALM, authmethods=[u'ticket'])
def onJoin(self, details):
self.config.extra['joined'].set()
def onChallenge(self, challenge):
return self.config.extra['authentication']
def onDisconnect(self):
log.debug('onDisconnect')
reactor.stop()
def get_session_runner():
extra = {
'result_queue': Queue.Queue(),
'joined': threading.Event(),
'authentication': ACCESS_TOKEN,
}
session = WampSession(ComponentConfig(extra=extra))
certificate = PrivateCertificate.loadPEM(CERT_DATA)
ssl = certificate.options()
runner = ApplicationRunner(url=URL, ssl=ssl)
return session, runner
def main():
session, runner = get_session_runner()
# Monkeypatch this to make it run in a thread, since ApplicationRunner
# calls 'run' without arguments, and we need to skip signal handlers.
def _run_threaded():
return reactor._run_original(installSignalHandlers=False)
reactor._run_original = reactor.run
reactor.run = _run_threaded
t = threading.Thread(target=runner.run, args=(session,))
t.daemon = True
t.start()
orig = FUNC_ARGS['resultTopic']
for i in xrange(100):
FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
start = time.time()
reactor.callInThread(session.do_work, FUNC_ARGS)
result = session.config.extra['result_queue'].get()
end = time.time()
print
time.sleep(1) # minisleep for tcpdump logging delay
print 'Result: %s (%.2fs)' % (result, end - start)
print '\nSleeping\n'
time.sleep(6)
print '\nDone Sleeping\n'
This seems to work well after modifications based upon Jean-Paul's answer (moved calls into a thread instead of reactor, use callFromThread
):
def main():
session, runner = get_session_runner()
t = threading.Thread(target=threadstuff, args=(session,))
t.start()
runner.run(session)
def threadstuff(session):
orig = FUNC_ARGS['resultTopic']
session.config.extra['joined'].wait()
for i in xrange(100):
FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
start = time.time()
reactor.callFromThread(session.do_work, FUNC_ARGS)
result = session.config.extra['result_queue'].get()
end = time.time()
print
time.sleep(1) # minisleep for tcpdump logging delay
print 'Result: %s (%.2fs)' % (result, end - start)
print '\nSleeping\n'
time.sleep(6)
print '\nDone Sleeping\n'
reactor.stop()
callInThread
is the scheduling API that lets you call a given function in a thread from the reactor's threadpool. It must be called from the reactor thread.
callFromThread
is the scheduling API that lets you call a given function in the reactor thread. It is typically called from a non-reactor thread.
You should be using callFromThread
here since you're trying to schedule work from a non-reactor thread.