pythonpython-asyncio

Python Asyncio: How to ensure async jobs are executed sequentially from requests


I have a Django endpoint which needs to do some work when called (writing to a file), however, I would like the work to be done synchronised with any other calls to the same endpoint at the time. Such that, if the endpoint is called 10 times within the same moment, it will write to the file 10 times but one at a time, in the order the requests were received.

I have tried implementing various solutions with asyncio.create_event_loop() and run_in_executor() but when I test this by sending several requests at once I haven't been able to get it to truly execute the workers one at a time on the event loop. How can I achieve this? I am basically looking for a task queue, but one that each process which submits to the tasks queue has no knowledge of what is currently on the task queue. I was hoping I could utility a single event loop to submit workers to it and have it run them one at a time, but it seems that is not the case from my tests.

Here is my current code:

        append = file.read()

        logger.info(
            f"{request_number} Appending {file.size} bytes to file"
        )

        def append_to_file():
            @retry(tries=3, backoff=3)
            def append_to_file_with_retry():
                with default_storage.open(file_name, "a+b") as f:
                    logger.info(f"Starting write @ {request_number}")
                    f.write(b"\n" + append)
                    logger.info(f"Finished write @ {request_number}")

            append_to_file()

        self.get_sync_event_loop().run_in_executor(None, append_to_file, [])
        logger.info(f"Sending response @ {request_number}")
        return Response(None, status=status.HTTP_200_OK)

Solution

  • I have figured this out.

    class Example:
        executor = ThreadPoolExecutor(1)
        
        def endpoint():
            ...
            ...run_in_executor(exectuor, append_to_file)
    

    By using a thread pool with max workers of 1, it guarantees that only one task can be executed at a time.