I'm working on a pattern where I communicate amongst multiple queues to process items along a pipeline. I am using sentinels to communicate between the queues when to stop working, however in the following code, I am seeing results that confuse me.
When reading from the write_q
in write_task()
I see the first value come in as the sentinel None
instead of the tasks in the order they were placed in response_task()
. If I understand right, write_task()
should receive the items in order and process them in order as the tasks are created.
Also, when printing the qsize()
in write_task()
after I find the sentinel, it says there are 0 items, however when printing back in main it seems that the qsize()
of write_q
still has 2 items. I've read somewhere that aiofiles
uses run_in_executor()
which means there might be a divergence of where the queue is hand
Most of the below code is boilerplate to illustrate the actual scenario on why my code continues to block infinitely.
import asyncio
import aiofiles
import aiocsv
import json
async def fetch(t: float) -> dict:
print(f"INFO: Sleeping for {t}s")
await asyncio.sleep(t)
return t
async def task(l: list, request_q: asyncio.Queue) -> None:
# Read tasks from source of data
for i in l:
await request_q.put(
asyncio.create_task(fetch(i))
)
# Sentinel value to signal we are done receiving from source
await request_q.put(None)
async def request_task(request_q: asyncio.Queue, write_q: asyncio.Queue) -> None:
while True:
req = await request_q.get()
# If we received sentinel for tasks, pass message to next queue
if not req:
print("INFO: received sentinel from request_q")
request_q.task_done()
await request_q.put(None) # put back into the queue to signal to other consumers we are done
break
# Make the request
resp = await req
await write_q.put(resp)
request_q.task_done()
async def write_task(write_q: asyncio.Queue) -> None:
headers: bool = True
async with aiofiles.open("file.csv", mode="w+", newline='') as f:
w = aiocsv.AsyncWriter(f)
while True:
# Get data out of the queue to write it
data = await write_q.get()
print(data)
# if not data:
# print(f"INFO: Found sentinel in write_task, queue size was: {write_q.qsize()}")
# write_q.task_done()
# await f.flush()
# break
if headers:
await w.writerow([
"status",
"data",
])
headers = False
# Write the data from the response
await w.writerow([
"200",
json.dumps(data)
])
await f.flush()
write_q.task_done()
async def main() -> None:
# Create fake data to POST
items: list[str] = [.2, .5, 1]
# Queues for orchestrating
request_q = asyncio.Queue()
write_q = asyncio.Queue()
# one producer
producer = asyncio.create_task(
task(items, request_q)
)
# 5 request consumers
request_consumers = [
asyncio.create_task(
request_task(request_q, write_q)
)
for _ in range(2)
]
# 5 write consumers
write_consumer = asyncio.create_task(
write_task(write_q)
)
errors = await asyncio.gather(producer, return_exceptions=True)
print(f"INFO: Producer has completed! exceptions: {errors}")
await request_q.join()
for c in request_consumers:
c.cancel()
print("INFO: request consumer has completed! ")
print(f"INFO: write_q in main qsize: {write_q.qsize()}")
await write_q.join()
print("INFO: write queue has completed! ")
# await write_consumer
write_consumer.cancel()
print("INFO: Complete!")
if __name__ == "__main__":
# loop = asyncio.new_event_loop()
# loop.run_until_complete(main())
asyncio.run(main())
My first comment is that your code is far more complicated than it needs to me. But the real problem is that you have N fetches you need to do (where N is the length of your items
list) and M tasks (where M is currently 5) performing these N fetches concurrently but these tasks cannot be assumed to complete in an order corresponding to your items
list.
I believe the simplest solution is to pre-allocate a results
list of length N and each fetch request is passed this list and an index indicating where the result is to go in that list. You cannot start writing your CSV file until all fetches have been completed if you want the file rows to correspond to the items
input list. You only need one queue!
For demo purposes I have renamed items
to the more descriptive data_list
and have initialized it as:
data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
I have also modified fetch
to simulate fetching data and just returning req[data]
. Consequently when the program completes the contents of file.csv should be:
I have also renamed some of your functions and variables to more descriptive ones:
import asyncio
import aiohttp
import aiofiles
import aiocsv
import json
N_REQUEST_TASKS = 5
async def fetch(req: dict, results: list, idx: int) -> dict:
# Make the request
# For demo purposes:
import random
await asyncio.sleep(random.random())
result = req['data']
print(f"INFO returning {result} at index {idx}")
results[idx] = result
return
async with aiohttp.ClientSession() as session:
try:
async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
result = await response.json()
response.raise_for_status()
print(f"INFO: response status was: {response.status}")
# Put response into queue to be written to file
except Exception as err:
print(f"ERROR: error making request: {err}")
result = err
finally:
print(f"INFO returning {result} at index {idx}")
results[idx] = result
async def create_requests(data_list: list, results: list, request_q: asyncio.Queue) -> None:
# Read tasks from source of data
for idx, data in enumerate(data_list):
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": data
}
await request_q.put(
(req, results, idx)
)
for _ in range(N_REQUEST_TASKS):
# One sentinel for each request task:
await request_q.put(None)
async def request_task(request_q: asyncio.Queue) -> None:
while True:
# Retrieve necessary data to make request
request = await request_q.get()
# Sentinel?
if not request:
print("INFO: received sentinel from request_q")
break
# Make the request which will put data into the response queue
# Unpack:
req, results, idx = request
print(f"INFO: request in request_task: {req['data']}")
await fetch(req, results, idx)
async def writer(results: list) -> None:
async with aiofiles.open("file.csv", mode="w", newline='') as f:
w = aiocsv.AsyncWriter(f)
await w.writerow([
"status",
"data",
])
for result in results:
print(f"INFO: data in write_task: {result}")
if isinstance(result, Exception):
continue
# Write the data from the response
await w.writerow([
"200",
json.dumps(result)
])
await f.flush()
async def main() -> None:
# Create fake data to POST
data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
# Preallocate results list so that results will be in correct order
results = [None] * len(data_list)
# Request queue
request_q = asyncio.Queue()
tasks = []
# one producer
tasks.append(
asyncio.create_task(
create_requests(data_list, results, request_q)
)
)
# N_REQUEST_TASKS consumers
for _ in range(N_REQUEST_TASKS):
tasks.append(
asyncio.create_task(
request_task(request_q)
)
)
await asyncio.gather(*tasks)
print(f"INFO: Results have been produced")
await writer(results)
print("INFO: writer has completed! ")
if __name__ == "__main__":
asyncio.run(main())
Prints:
INFO: request in request_task: ['hello0', 'world0']
INFO: request in request_task: ['hello1', 'world1']
INFO: request in request_task: ['hello2', 'world2']
INFO: request in request_task: ['hello3', 'world3']
INFO: request in request_task: ['hello4', 'world4']
INFO returning ['hello1', 'world1'] at index 1
INFO: request in request_task: ['hello5', 'world5']
INFO returning ['hello0', 'world0'] at index 0
INFO: request in request_task: ['hello6', 'world6']
INFO returning ['hello6', 'world6'] at index 6
INFO: request in request_task: ['hello7', 'world7']
INFO returning ['hello2', 'world2'] at index 2
INFO: request in request_task: ['hello8', 'world8']
INFO returning ['hello3', 'world3'] at index 3
INFO: request in request_task: ['hello9', 'world9']
INFO returning ['hello4', 'world4'] at index 4
INFO: received sentinel from request_q
INFO returning ['hello9', 'world9'] at index 9
INFO: received sentinel from request_q
INFO returning ['hello5', 'world5'] at index 5
INFO: received sentinel from request_q
INFO returning ['hello7', 'world7'] at index 7
INFO: received sentinel from request_q
INFO returning ['hello8', 'world8'] at index 8
INFO: received sentinel from request_q
INFO: Results have been produced
INFO: data in write_task: ['hello0', 'world0']
INFO: data in write_task: ['hello1', 'world1']
INFO: data in write_task: ['hello2', 'world2']
INFO: data in write_task: ['hello3', 'world3']
INFO: data in write_task: ['hello4', 'world4']
INFO: data in write_task: ['hello5', 'world5']
INFO: data in write_task: ['hello6', 'world6']
INFO: data in write_task: ['hello7', 'world7']
INFO: data in write_task: ['hello8', 'world8']
INFO: data in write_task: ['hello9', 'world9']
INFO: writer has completed!
Update
If you do not care about the completion order of fetch tasks and you want to append rows as soon as data has been fetched, then using two queues is the simplest approach as follows:
import asyncio
import aiohttp
import aiofiles
import aiocsv
import json
N_REQUEST_TASKS = 5
async def fetch(req: dict) -> dict:
# Make the request
# For demo purposes:
import random
await asyncio.sleep(random.random())
result = req['data']
print(f"INFO returning {result}")
return result
async with aiohttp.ClientSession() as session:
try:
async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
result = await response.json()
response.raise_for_status()
print(f"INFO: response status was: {response.status}")
# Put response into queue to be written to file
except Exception as err:
print(f"ERROR: error making request: {err}")
result = err
finally:
print(f"INFO returning {result}")
return result
async def create_requests(data_list: list, request_q: asyncio.Queue) -> None:
# Read tasks from source of data
for data in data_list:
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": data
}
await request_q.put(
req
)
for _ in range(N_REQUEST_TASKS):
# One sentinel for each request task:
await request_q.put(None)
async def request_task(request_q: asyncio.Queue, writer_q: asyncio.Queue) -> None:
while True:
# Retrieve necessary data to make request
req = await request_q.get()
# Sentinel?
if not req:
print("INFO: received sentinel from request_q")
break
# Make the request which will put data into the response queue
print(f"INFO: request in request_task: {req['data']}")
result = await fetch(req)
await writer_q.put(result)
async def writer(writer_q) -> None:
async with aiofiles.open("file.csv", mode="w", newline='') as f:
w = aiocsv.AsyncWriter(f)
await w.writerow([
"status",
"data",
])
while True:
result = await writer_q.get()
if result is None:
break
print(f"INFO: data in write_task: {result}")
if isinstance(result, Exception):
continue
# Write the data from the response
await w.writerow([
"200",
json.dumps(result)
])
await f.flush()
async def main() -> None:
# Create fake data to POST
data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
# Request queue
request_q = asyncio.Queue()
# Writer queue
writer_q = asyncio.Queue()
tasks = []
# one producer
tasks.append(
asyncio.create_task(
create_requests(data_list, request_q)
)
)
# N_REQUEST_TASKS consumers
for _ in range(N_REQUEST_TASKS):
tasks.append(
asyncio.create_task(
request_task(request_q, writer_q)
)
)
writer_task = asyncio.create_task(
writer(writer_q)
)
await asyncio.gather(*tasks)
print(f"INFO: Results have been produced")
# Put sentinelto get writer to quit:
await writer_q.put(None)
await writer_task
print("INFO: writer has completed! ")
if __name__ == "__main__":
asyncio.run(main())