I have a program (an ASGI server) that is structured roughly like this:
import asyncio
import contextvars
ctxvar = contextvars.ContextVar("ctx")
async def lifepsan():
ctxvar.set("spam")
async def endpoint():
assert ctxvar.get() == "spam"
async def main():
ctx = contextvars.copy_context()
task = asyncio.create_task(lifepsan())
await task
task = asyncio.create_task(endpoint())
await task
asyncio.run(main())
Because the lifespan event / endpoints are run in tasks, they can't share contextvars.
This is by design: tasks copy the context before executing, so lifespan
can't set ctxvar
properly.
This is the desired behavior for endpoints, but I would like for execution to appear like this (from a user's perspective):
async def lifespan():
ctxvar.set("spam")
await endpoint()
In other words, the endpoints are executed in their own independent context, but within the context of the lifespan.
I tried to get this to work by using contextlib.copy_context()
:
import asyncio
import contextvars
ctxvar = contextvars.ContextVar("ctx")
async def lifepsan():
ctxvar.set("spam")
print("set")
async def endpoint():
print("get")
assert ctxvar.get() == "spam"
async def main():
ctx = contextvars.copy_context()
task = ctx.run(asyncio.create_task, lifepsan())
await task
endpoint_ctx = ctx.copy()
task = endpoint_ctx.run(asyncio.create_task, endpoint())
await task
asyncio.run(main())
As well as:
async def main():
ctx = contextvars.copy_context()
task = asyncio.create_task(ctx.run(lifespan))
await task
endpoint_ctx = ctx.copy()
task = asyncio.create_task(endpoint_ctx.run(endpoint))
await task
However it seems that contextvars.Context.run
does not work this way (I guess the context is bound when the coroutine is created but not when it is executed).
Is there a simple way to achieve the desired behavior, without restructuring how the tasks are being created or such?
This feature will be supported in Python 3.11: https://github.com/python/cpython/issues/91150
You will be able to write:
async def main():
ctx = contextvars.copy_context()
task = asyncio.create_task(lifepsan(), context=ctx)
await task
endpoint_ctx = ctx.copy()
task = asyncio.create_task(endpoint(), context=endpoint_ctx)
await task
In the meantime, in current Python versions you will need a backport of this feature. I can't think of a good one, but a bad one is here.