So, how to run async tasks properly to achieve the goal?
What is RuntimeError: await wasn't used with future
(below), how can I fix it?
I have already tried:
(from asgiref
This option makes it possible to run asyncio coroutines, but retries functionality doesn't work.
Same problem as in asgiref. (This option makes it possible to run asyncio coroutines, but retries functionality doesn't work.)
I have performed try to create my own decorator like async_to_sync that runs coroutines threadsafe (asyncio.run_coroutine_threadsafe
), but I have behavior as I described above.
Also I have try
or asyncio.get_event_loop().run_until_complete()
(and self.retry(...)
) inside celery task. This works well, tasks runs, retries works, but there is incorrect coroutine execution - inside async
function I cannot use aioredis.
Implementation notes:
celery -A celery_test.celery_app worker -l info -n worker1 -P gevent --concurrency=10 --without-gossip --without-mingle
transport = f"redis://localhost/9"
celery_app = Celery("worker", broker=transport, backend=transport,
celery_app.conf.broker_transport_options = {
'visibility_timeout': 60 * 60 * 24,
'fanout_prefix': True,
'fanout_patterns': True
def temp_asyncio_loop():
# asyncio.get_event_loop() automatically creates event loop only for main thread
prev_loop = asyncio.get_event_loop()
except RuntimeError:
prev_loop = None
loop = asyncio.new_event_loop()
yield loop
del loop
def with_temp_asyncio_loop(f):
def wrapper(*args, **kwargs):
with temp_asyncio_loop() as t_loop:
return f(*args, loop=t_loop, **kwargs)
return wrapper
def await_(coro):
return asyncio.get_event_loop().run_until_complete(coro)
@celery_app.task(bind=True, max_retries=30, default_retry_delay=0)
def debug(self, **kwargs):
except Exception as exc:
async def debug_async():
async with RedisLock(f'redis_lock_{}'):
class RedisLockException(Exception):
class RedisLock(AsyncContextManager):
Redis Lock class
:param lock_id: string (unique key)
:param value: dummy value
:param expire: int (time in seconds that key will storing)
:param expire_on_delete: int (time in seconds, set pause before deleting)
with RedisLock('123_lock', 5 * 60):
# do something
except RedisLockException:
def __init__(self, lock_id: str, value='1', expire: int = 4, expire_on_delete: int = None):
self.lock_id = lock_id
self.expire = expire
self.value = value
self.expire_on_delete = expire_on_delete
async def acquire_lock(self):
return await redis.setnx(self.lock_id, self.value)
async def release_lock(self):
if self.expire_on_delete is None:
return await redis.delete(self.lock_id)
await redis.expire(self.lock_id, self.expire_on_delete)
async def __aenter__(self, *args, **kwargs):
if not await self.acquire_lock():
raise RedisLockException({
'redis_lock': 'The process: {} still run, try again later'.format(await redis.get(self.lock_id))
await redis.expire(self.lock_id, self.expire)
async def __aexit__(self, exc_type, exc_value, traceback):
await self.release_lock()
On my windows machine await redis.setnx(...)
blocks celery worker and it stops producing logs and Ctrl+C
doesn't work.
Inside the docker container, I receive an error. There is part of traceback:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/aioredis/", line 854, in read_response
response = await self._parser.read_response()
File "/usr/local/lib/python3.9/site-packages/aioredis/", line 366, in read_response
aioredis.exceptions.ConnectionError: Connection closed by server.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/celery/app/", line 451, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/", line 734, in __protected_call__
return*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/", line 54, in run
ret = task.retry(exc=exc, **retry_kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/app/", line 717, in retry
File "/usr/local/lib/python3.9/site-packages/celery/app/", line 34, in run
return task._orig_run(*args, **kwargs)
File "/app/celery_tasks/", line 69, in wrapper
return f(*args, **kwargs) # <--- inside with_temp_asyncio_loop from utils
File "/usr/local/lib/python3.9/", line 575, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/app/db/", line 50, in __aenter__
if not await self.acquire_lock():
File "/app/db/", line 41, in acquire_lock
return await redis.setnx(self.lock_id, self.value)
File "/usr/local/lib/python3.9/site-packages/aioredis/", line 1064, in execute_command
return await self.parse_response(conn, command_name, **options)
File "/usr/local/lib/python3.9/site-packages/aioredis/", line 1080, in parse_response
response = await connection.read_response()
File "/usr/local/lib/python3.9/site-packages/aioredis/", line 859, in read_response
await self.disconnect()
File "/usr/local/lib/python3.9/site-packages/aioredis/", line 762, in disconnect
await self._writer.wait_closed()
File "/usr/local/lib/python3.9/asyncio/", line 359, in wait_closed
await self._protocol._get_close_waiter(self)
RuntimeError: await wasn't used with future
Use solo
pool, then create a decorator which run task function
asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
and make your task asynchronous
def sync(f):
def wrapper(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(f(*args, **kwargs))
return wrapper
async def task():