I have a lot of heavy jobs which need to send throttled HTTP requests (2 requests/sec) and calculate the results based on the response values.
I use throttler to control the request rate, and use joblib to calculate the results on multi-processes:
import asyncio
from math import sqrt
import time
from joblib import Parallel, delayed
from throttler import throttle
@throttle(rate_limit=1, period=0.5)
async def request(i):
"""
Do nothing. Just sleep to mimic HTTP requests.
"""
time.sleep(0.3)
print(f"get response: {i} {time.time()}")
return i
def heavy_calculate(i):
"""
Just sqrt and square the number i for 30_000_000 times to mimik heavy job.
It takes about 2.1 sec for each call.
"""
n = i
for _ in range(30_000_000):
n = sqrt(n)
n = n ** 2
print(f"heavy_calculate: {n} {time.time()}")
return n
async def many_tasks(count: int):
print(f"=== START === {time.time()}")
with Parallel(n_jobs=-1) as parallel:
a = []
coros = [request(i) for i in range(count)]
for coro in asyncio.as_completed(coros):
i = await coro
#! Note: this heavy_calculate will block the request rate too
results = heavy_calculate(i)
#! Try to using the following call instead, but it raises error "cannot unpack non-iterable function object"
# results = parallel(delayed(heavy_calculate)(i))
a.append(results)
print(f"=== END === {time.time()}")
print(a)
# Start run
asyncio.run(many_tasks(10))
The above code shows two functions should be run:
request
: mimic HTTP requests which is throttled with 0.5 sec.heavy_calculate
: mimic CPU-bound calculation which will block other jobs.I want to call request
with throttling and run heavy_calculate
on multi-processes, but it shows error:
...
TypeError: cannot unpack non-iterable function object
Any suggestion?
You could use the AsyncIO loop.run_in_exectutor
function to execute the CPU intensive work on a multi-processing pool, avoiding blocking the event loop:
import asyncio
from math import sqrt
from concurrent.futures import ProcessPoolExecutor
from throttler import throttle
@throttle(rate_limit=1, period=0.5)
async def fetch(i: int) -> int:
await asyncio.sleep(0.3)
return i
def heavy_work(i: int) -> int:
for _ in range(30_000_000):
_ = sqrt(i) ** 2
return i
async def fetch_and_process(i: int, executor: ProcessPoolExecutor) -> int:
i = await fetch(i)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, heavy_work, i)
async def main():
with ProcessPoolExecutor() as executor:
results = await asyncio.gather(
*(fetch_and_process(i, executor) for i in range(20))
)
print(results)
if __name__ == "__main__":
asyncio.run(main())
This would run many fetch_and_process
concurrently within the throttling limit applied to the fetch instruction on as many CPU cores as available on your computer.
Note that your fetch
method has a flaw, you should use asyncio.sleep
to simulate network latency and not time.sleep
as the former will block the event loop.