I have a Python script which I would like to a) Continuously listen for requests from an API, and b) Run an expensive algorithm continuously in the background. The results from the algorithm are saved locally to a file (results.json
), and if the API requests the result, I would like to load current results from disk and send it to the endpoint.
I've provided a simplified sketch of the code logic below. However I'm not sure how to structure it so that algo.run
runs in the background constantly, while Main.send
periodically does the API calls. Since both are blocking operations, should I be using async def
in the methods in Algorithm
as well? In addition, how would I handle the edge case where Main.send
tries to read the file from disk, while algo.run
is trying to save the file at the same time?
Any suggestions would be greatly appreciated!
import json
import random
import time
ENDPOINT = 'http://some_api.com/endpoint' # Some external API
class Main:
def __init__(self):
algo = Algorithm()
# self.send() ## These would block
# algo.run()
async def send(self):
# Continuously listen on an endpoint
while True:
response = requests.get(ENDPOINT).json()
if response['should_send']:
# Load the result from disk
with open('results.json', 'r') as file:
outputs = json.load(file)
# Send results to API
requests.post(ENDPOINT, outputs)
time.sleep(60)
class Algorithm:
def __init__(self):
pass
def run(self):
# Some expensive computations, running repeatedly in background
while True:
outputs = {'result' : random.random()}
time.sleep(60)
# Save result to disk
with open('results.json', 'w') as file:
json.dump(outputs, file)
If your CPU dependent algorithm doesn't pause for I/O (or even if it does), simply run it into another thread.
Asyncio has some functionality to call code in other threads, which can be great (and even easier to use than concurrent.futures
.
In your case, this might work
import json
import random
import time
import asyncio
import httpx # (you will use this to replace requests)
ENDPOINT = 'http://some_api.com/endpoint' # Some external API
# let's keep things simple:
# in Python things that don't need to be
# a class, don't need to be a class!
async def main():
# this now being async, it can orchestrate the
# lifetime of Algorithm classes:
algo = Algorithm()
# create the "send" task, which then will
# run as the async-loop in the main thread
# is idle!
send_task = asyncio.create_task(send())
# sets a task that will run "algo" in another thread:
algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
# pass the control to the loop, staying idle and allowing
# both tasks to run:
asyncio.gather(send_task, algo_task)
async def send(self):
# Continuously listen on an endpoint
with httpx.AsyncClient as client:
while True:
# "requests" is not really an asynchronous lib
# use httpx instead:
#
response = (await httpx.get(ENDPOINT)).json()
if response['should_send']:
# Load the result from disk
with open('results.json', 'r') as file:
outputs = json.load(file)
# Send results to API
requests.post(ENDPOINT, outputs)
# pauses 60 seconds while allowing other
# async tasks to run in the same thread:
await asyncio.sleep(60)
class Algorithm:
def __init__(self):
pass
def run(self):
# Some expensive computations, running repeatedly in background
while True:
outputs = {'result' : random.random()}
# this is running in other thread, no problem using synchronous sleep:
time.sleep(60)
# Save result to disk
with open('results.json', 'w') as file:
json.dump(outputs, file)
if __name__ == "__main__":
asyncio.run(main())
Actually, here, asyncio would not even be needed; I just ended up using it because you mentioned it on the question and your send
method was marked as asynchronous. The real deal is to have the non-collaborative "algo" code in a different thread.
But the asyncio example can be the base for you to interleave other i/o bound tasks in the same project.
As for your other question, of the file being written while it is simultaneousy written is of no concern: since you are issuing separate open
calls, the OS will do the right thing for you (ok, Windows might give you an OSError, if you try to open the file while the other open
is in course, but MacOS, Linux, and all other conceivable OSes will just work):
Let's say that a write would hit just after the file was open for reading: if the "open" operation already resolved, the program will read normally from the previous version of the file, even while the filesystem will show the new version as it's been written to.
If writting takes time, however, there is the risk of sending a partial file (or a 0 lenght file) in the converse case: when saving has started and is ongoing, and the file is open for reading. If writing is fast, and sending the file can always wait its completion, all you need is a lock (in this case, a threading.Lock):
import json
import random
import time
import asyncio
import httpx # (you will use this to replace requests)
import threading
ENDPOINT = 'http://some_api.com/endpoint' # Some external API
# let's keep things simple:
# in Python things that don't need to be
# a class, don't need to be a class!
# if "send" has some state it would like
async def main():
# this now being async, it can orchestrate the
# lifetime of Algorithm classes:
lock = threading.Lock()
algo = Algorithm(lock)
# create the "send" task, which then will
# run as the async-loop in the main thread
# is idle!
send_task = asyncio.create_task(send(lock))
# sets a task that will run "algo" in another thread:
algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
# pass the control to the loop, staying idle and allowing
# both tasks to run:
asyncio.gather(send_task, algo_task)
async def send(self, lock):
# Continuously listen on an endpoint
with httpx.AsyncClient as client:
while True:
# "requests" is not really an asynchronous lib
# use httpx instead:
#
response = (await httpx.get(ENDPOINT)).json()
if response['should_send']:
# Load the result from disk
with lock, open('results.json', 'r') as file:
outputs = json.load(file)
# Send results to API
requests.post(ENDPOINT, outputs)
# pauses 60 seconds while allowing other
# async tasks to run in the same thread:
await asyncio.sleep(60)
class Algorithm:
def __init__(self, lock):
self.lock = lock
def run(self):
# Some expensive computations, running repeatedly in background
while True:
outputs = {'result' : random.random()}
# this is running in other thread, no problem using synchronous sleep:
time.sleep(60)
# Save result to disk
with lock, open('results.json', 'w') as file:
json.dump(outputs, file)
if __name__ == "__main__":
asyncio.run(main())
Otherwise, say, writting to the file takes several seconds, and you rather not wait, the pattern would be to write to a different file name, and then, once writing is done, in the writer task, rename the new file over the old one, using the lock mechanism:
import os
...
class Algorithm:
...
def run(self):
while True:
...
with open('new_results.json', 'w') as file:
json.dump(outputs, file)
with lock:
os.rename("new_results.json", "results.json")