I have a tornado server which I am trying to make synchronous. I have a client which makes asynchronous requests to the server simultaneously. It pings the server every 5 seconds with a heartbeat and secondly, it makes a GET request for a job whenever it can.
On the server side, there is a thread-safe queue which contains jobs. It blocks for 20 seconds if the queue is empty. I want it to hold the connection and block for that 20 seconds and when it returns, it writes "No job" to the client. As soon as a job is available, it should immediately write it to the client since queue.get() would return. I want the heartbeats to continue happening in the background while this request is blocked. Here I am making two asynchronous requests to the server from the same client.
Here is a sample project I build which kind of simulates my issue.
Server:
import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen
q = Queue()
class HeartBeatHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self):
print("Heart beat")
class JobHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
print("Job")
try:
job = yield q.get(block=True, timeout=20)
self.write(job)
except Exception as e:
self.write("No job")
def make_app():
return tornado.web.Application([
(r"/heartbeat", HeartBeatHandler),
(r"/job", JobHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
try:
tornado.ioloop.IOLoop.current().start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.current().stop()
Client:
import asyncio
from tornado import httpclient, gen
@gen.coroutine
def heartbeat_routine():
while True:
http_client = httpclient.AsyncHTTPClient()
heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
body="")
try:
yield http_client.fetch(heartbeat_request)
yield asyncio.sleep(5)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
@gen.coroutine
def worker_routine():
while True:
http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
try:
response = yield http_client.fetch(job_request)
print(response.body)
except httpclient.HTTPError as e:
print("Heartbeat failed!\nError: {}".format(str(e)))
http_client.close()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(heartbeat_routine())
asyncio.ensure_future(worker_routine())
loop.run_forever()
Questions:
If you use a thread-safe queue, you must use not use blocking operations from the IOLoop thread. Instead, run them in a thread pool:
job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
Alternately, you could use Tornado's async (but thread-unsafe) queue, and use IOLoop.add_callback
whenever you need to interact with the queue from another thread.
There's some magic in the AsyncHTTPClient
constructor, which tries to share existing instances when possible, but this means that constructor arguments are only effective the first time. The worker_routine
is picking up the default instances created by heartbeat_routine
. Add force_instance=True
to ensure you get a fresh client in worker_routine
(and call .close()
on it when you're done)