pythonclient-servertornadopython-asynciosendasynchronousrequest

Multiple Async HTTP connections to Tornado Server


I have a tornado server which I am trying to make synchronous. I have a client which makes asynchronous requests to the server simultaneously. It pings the server every 5 seconds with a heartbeat and secondly, it makes a GET request for a job whenever it can.

On the server side, there is a thread-safe queue which contains jobs. It blocks for 20 seconds if the queue is empty. I want it to hold the connection and block for that 20 seconds and when it returns, it writes "No job" to the client. As soon as a job is available, it should immediately write it to the client since queue.get() would return. I want the heartbeats to continue happening in the background while this request is blocked. Here I am making two asynchronous requests to the server from the same client.

Here is a sample project I build which kind of simulates my issue.

Server:

import tornado.ioloop
import tornado.web
from queue import Queue
from tornado import gen

q = Queue()


class HeartBeatHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def post(self):
        print("Heart beat")


class JobHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        print("Job")
        try:
            job = yield q.get(block=True, timeout=20)
            self.write(job)
        except Exception as e:
            self.write("No job")


def make_app():
    return tornado.web.Application([
        (r"/heartbeat", HeartBeatHandler),
        (r"/job", JobHandler),
    ])


if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    try:
        tornado.ioloop.IOLoop.current().start()
    except KeyboardInterrupt:
        tornado.ioloop.IOLoop.current().stop()

Client:

import asyncio
from tornado import httpclient, gen


@gen.coroutine
def heartbeat_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient()
        heartbeat_request = httpclient.HTTPRequest("http://{}/heartbeat".format("127.0.0.1:8888"), method="POST",
                                                   body="")
        try:
            yield http_client.fetch(heartbeat_request)
            yield asyncio.sleep(5)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


@gen.coroutine
def worker_routine():
    while True:
        http_client = httpclient.AsyncHTTPClient(defaults=dict(request_timeout=180))
        job_request = httpclient.HTTPRequest("http://{}/job".format("127.0.0.1:8888"), method="GET")
        try:
            response = yield http_client.fetch(job_request)
            print(response.body)
        except httpclient.HTTPError as e:
            print("Heartbeat failed!\nError: {}".format(str(e)))

        http_client.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(heartbeat_routine())
    asyncio.ensure_future(worker_routine())
    loop.run_forever()

Questions:

  1. The problem is that the heartbeats also block for those 20 seconds while the queue.get() blocks. Which I do not want.
  2. As you can see in my client I set request timeout to 180 seconds. But that never seems to work with tornado. If you increase queue.get() timeout above 20 seconds, it returns error code saying request timed out.

Solution

    1. If you use a thread-safe queue, you must use not use blocking operations from the IOLoop thread. Instead, run them in a thread pool:

      job = yield IOLoop.current().run_in_executor(None, lambda: q.get(block=True, timeout=20))
      

      Alternately, you could use Tornado's async (but thread-unsafe) queue, and use IOLoop.add_callback whenever you need to interact with the queue from another thread.

    2. There's some magic in the AsyncHTTPClient constructor, which tries to share existing instances when possible, but this means that constructor arguments are only effective the first time. The worker_routine is picking up the default instances created by heartbeat_routine. Add force_instance=True to ensure you get a fresh client in worker_routine (and call .close() on it when you're done)