I am making requests to an external API, getting the response back and writing it to a file. Everything works/runs fine, however I receive the "Task was destroyed but it is pending!" warning that I'd like to clean up.
I have emulated the process below. I receive items from a source (e.g. a list) of items and as I receive them, I put them into the queue, signaling the end of the list but putting in a sentinel None
value.
The write_q
continues to receive items from this queue until it receives the sentinel and then breaks out.
Below is the code which will show that my write_task()
is cancelled before it is completed. What is the proper design to handle this?
import asyncio
import aiofiles
import aiocsv
import json
async def task(l: list, write_q: asyncio.Queue) -> None:
# Read tasks from source of data
for i in l:
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": i
}
await write_q.put(req)
# Sentinel value to signal we are done receiving from source
await write_q.put(None)
async def write_task(write_q: asyncio.Queue) -> None:
headers: bool = True
while True:
async with aiofiles.open("file.csv", mode="a+", newline='') as f:
w = aiocsv.AsyncWriter(f)
# Get data out of the queue to write it
data = await write_q.get()
if not data:
write_q.task_done()
await f.flush()
break
if headers:
await w.writerow([
"status",
"data",
])
headers = False
# Write the data from the response
await w.writerow([
"200",
json.dumps(data)
])
write_q.task_done()
async def main() -> None:
# Create fake data to POST
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 5
# Queues for orchestrating
write_q = asyncio.Queue()
producer = asyncio.create_task(
task(items, write_q)
)
consumer = asyncio.create_task(
write_task(write_q)
)
errors = await asyncio.gather(producer, return_exceptions=True)
print(f"INFO: Producer has completed! exceptions: {errors}")
# Wait for queue to empty and cancel the consumer
await write_q.join()
consumer.cancel()
print("INFO: write consumer has completed! ")
print("INFO: Complete!")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
You should run
await consumer
instead of
consumer.cancel()
to wait for the end of task.
import asyncio
import aiofiles
import aiocsv
import json
async def task(l: list, write_q: asyncio.Queue) -> None:
# Read tasks from source of data
for i in l:
# Put a request task into the queue
req: dict = {
"headers": {"Accept": "application/json"},
"url": "https://httpbin.org/post",
"data": i
}
await write_q.put(req)
# Sentinel value to signal we are done receiving from source
await write_q.put(None)
async def write_task(write_q: asyncio.Queue) -> None:
headers: bool = True
while True:
async with aiofiles.open("file.csv", mode="a+", newline='') as f:
w = aiocsv.AsyncWriter(f)
# Get data out of the queue to write it
data = await write_q.get()
if not data:
write_q.task_done()
await f.flush()
break
if headers:
await w.writerow([
"status",
"data",
])
headers = False
# Write the data from the response
await w.writerow([
"200",
json.dumps(data)
])
write_q.task_done()
async def main() -> None:
# Create fake data to POST
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 5
# Queues for orchestrating
write_q = asyncio.Queue()
producer = asyncio.create_task(
task(items, write_q)
)
consumer = asyncio.create_task(
write_task(write_q)
)
errors = await asyncio.gather(producer, return_exceptions=True)
print(f"INFO: Producer has completed! exceptions: {errors}")
# Wait for queue to empty and cancel the consumer
await write_q.join()
await consumer
#consumer.cancel()
print("INFO: write consumer has completed! ")
print("INFO: Complete!")
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())