pythonasynchronousconcurrencypython-asyncio

Asynchronously running a function in the background while sending results in Python


I have a Python script which I would like to a) Continuously listen for requests from an API, and b) Run an expensive algorithm continuously in the background. The results from the algorithm are saved locally to a file (results.json), and if the API requests the result, I would like to load current results from disk and send it to the endpoint.

I've provided a simplified sketch of the code logic below. However I'm not sure how to structure it so that algo.run runs in the background constantly, while Main.send periodically does the API calls. Since both are blocking operations, should I be using async def in the methods in Algorithm as well? In addition, how would I handle the edge case where Main.send tries to read the file from disk, while algo.run is trying to save the file at the same time?

Any suggestions would be greatly appreciated!

import json
import random
import time

ENDPOINT = 'http://some_api.com/endpoint'  # Some external API

class Main:
    def __init__(self):
        algo = Algorithm()
    
        # self.send() ## These would block
        # algo.run()
    
    async def send(self):

        # Continuously listen on an endpoint
        while True:

            response = requests.get(ENDPOINT).json()
            if response['should_send']:
    
                # Load the result from disk
                with open('results.json', 'r') as file:
                    outputs = json.load(file)

                # Send results to API
                requests.post(ENDPOINT, outputs)
    
            time.sleep(60)

class Algorithm:
    def __init__(self):
        pass
    def run(self):
        # Some expensive computations, running repeatedly in background
        while True:
            outputs = {'result' : random.random()}
            time.sleep(60)

            # Save result to disk
            with open('results.json', 'w') as file:
                json.dump(outputs, file)

Solution

  • If your CPU dependent algorithm doesn't pause for I/O (or even if it does), simply run it into another thread.

    Asyncio has some functionality to call code in other threads, which can be great (and even easier to use than concurrent.futures.

    In your case, this might work

    import json
    import random
    import time
    import asyncio
    import httpx  # (you will use this to replace requests)
    
    ENDPOINT = 'http://some_api.com/endpoint'  # Some external API
    
    # let's keep things simple: 
    # in Python things that don't need to be
    # a class, don't need to be a class! 
    
    async def main():
        
        # this now being async, it can orchestrate the 
        # lifetime of Algorithm classes:
        
        algo = Algorithm()
        
        # create the "send" task, which then will 
        # run as the async-loop in the main thread
        # is idle!
        send_task = asyncio.create_task(send())
        
        # sets a task that will run "algo" in another thread: 
        algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
        
        # pass the control to the loop, staying idle and allowing
        # both tasks to run:
        asyncio.gather(send_task, algo_task)
                                        
                                        
    
    async def send(self):
    
        # Continuously listen on an endpoint
        with httpx.AsyncClient as client:
            while True:
    
                # "requests" is not really an asynchronous lib
                # use httpx instead:
                # 
                response = (await httpx.get(ENDPOINT)).json()
                if response['should_send']:
    
                    # Load the result from disk
                    with open('results.json', 'r') as file:
                        outputs = json.load(file)
    
                    # Send results to API
                    requests.post(ENDPOINT, outputs)
                # pauses 60 seconds while allowing other
                # async tasks to run in the same thread:
                await asyncio.sleep(60)
    
    class Algorithm:
        def __init__(self):
            pass
        def run(self):
            # Some expensive computations, running repeatedly in background
            while True:
                outputs = {'result' : random.random()}
                # this is running in other thread, no problem using synchronous sleep:
                time.sleep(60)
    
                # Save result to disk
                with open('results.json', 'w') as file:
                    json.dump(outputs, file)
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    

    Actually, here, asyncio would not even be needed; I just ended up using it because you mentioned it on the question and your send method was marked as asynchronous. The real deal is to have the non-collaborative "algo" code in a different thread.

    But the asyncio example can be the base for you to interleave other i/o bound tasks in the same project.

    As for your other question, of the file being written while it is simultaneousy written is of no concern: since you are issuing separate open calls, the OS will do the right thing for you (ok, Windows might give you an OSError, if you try to open the file while the other open is in course, but MacOS, Linux, and all other conceivable OSes will just work): Let's say that a write would hit just after the file was open for reading: if the "open" operation already resolved, the program will read normally from the previous version of the file, even while the filesystem will show the new version as it's been written to.

    If writting takes time, however, there is the risk of sending a partial file (or a 0 lenght file) in the converse case: when saving has started and is ongoing, and the file is open for reading. If writing is fast, and sending the file can always wait its completion, all you need is a lock (in this case, a threading.Lock):

    import json
    import random
    import time
    import asyncio
    import httpx  # (you will use this to replace requests)
    import threading
    
    ENDPOINT = 'http://some_api.com/endpoint'  # Some external API
    
    # let's keep things simple: 
    # in Python things that don't need to be
    # a class, don't need to be a class! 
    # if "send" has some state it would like
    async def main():
        
        # this now being async, it can orchestrate the 
        # lifetime of Algorithm classes:
        
        lock = threading.Lock()
        
        algo = Algorithm(lock)
        
        # create the "send" task, which then will 
        # run as the async-loop in the main thread
        # is idle!
        send_task = asyncio.create_task(send(lock))
        
        # sets a task that will run "algo" in another thread: 
        algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
        
        # pass the control to the loop, staying idle and allowing
        # both tasks to run:
        asyncio.gather(send_task, algo_task)
                                        
                                        
    
    async def send(self, lock):
    
        # Continuously listen on an endpoint
        with httpx.AsyncClient as client:
            while True:
    
                # "requests" is not really an asynchronous lib
                # use httpx instead:
                # 
                response = (await httpx.get(ENDPOINT)).json()
                if response['should_send']:
    
                    # Load the result from disk
                    with lock, open('results.json', 'r') as file:
                        outputs = json.load(file)
    
                    # Send results to API
                    requests.post(ENDPOINT, outputs)
                # pauses 60 seconds while allowing other
                # async tasks to run in the same thread:
                await asyncio.sleep(60)
    
    class Algorithm:
        def __init__(self, lock):
            self.lock = lock
        def run(self):
            # Some expensive computations, running repeatedly in background
            while True:
                outputs = {'result' : random.random()}
                # this is running in other thread, no problem using synchronous sleep:
                time.sleep(60)
    
                # Save result to disk
                with lock, open('results.json', 'w') as file:
                    json.dump(outputs, file)
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    

    Otherwise, say, writting to the file takes several seconds, and you rather not wait, the pattern would be to write to a different file name, and then, once writing is done, in the writer task, rename the new file over the old one, using the lock mechanism:

    import os
    ...
    
    class Algorithm:
    
        ...
        
        def run(self):
            while True:
                ...
                with open('new_results.json', 'w') as file:
                    json.dump(outputs, file)
                    
                with lock:
                    os.rename("new_results.json", "results.json")