pythonasynchronousasync-awaitpython-asyncio

Python asyncio: Concurrent write tasks frequently canceled, how to ensure alternating execution?


I'm developing an asynchronous cache system in Python using asyncio. I have two concurrent write operations that I want to alternate, but currently, only one side tasks are frequently canceled

Here is my current implementation in python3.10:

import asyncio
from collections import defaultdict
from contextlib import asynccontextmanager
from copy import deepcopy
from datetime import datetime


class LocalCacheUserToken:
    _g: dict[str, dict] = {}
    _tasks: dict[str, asyncio.Task] = {}
    _locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
    _write_lock = asyncio.Lock()

    @classmethod
    @asynccontextmanager
    async def acquire_lock(cls, key: str):
        async with cls._locks[key]:
            yield

    @classmethod
    async def get(cls, key: str):
        async with cls.acquire_lock(key):
            value = deepcopy(cls._g.get(key, None))
        return value

    @classmethod
    async def setex(cls, key: str, time: int, value: dict):
        async with cls._write_lock:
            async with cls.acquire_lock(key):
                print(f"[{datetime.now()}] {value=} get lock")
                if key in cls._tasks:
                    cls._tasks[key].cancel()

                cls._g[key] = value

            task = asyncio.create_task(cls._delay_delete(key, time, value))
            cls._tasks[key] = task

    @classmethod
    async def _delay_delete(cls, key, time, value):
        try:
            await asyncio.sleep(time)
            async with cls.acquire_lock(key):
                del cls._g[key]
            cls._tasks.pop(key, None)
        except asyncio.CancelledError:
            print(f"[{datetime.now()}] Deletion task for {key=}, {value=} was cancelled")


async def p(a: LocalCacheUserToken):
    while True:
        print(f"[{datetime.now()}] {await a.get('test')=}")
        await asyncio.sleep(1)


async def write(a: LocalCacheUserToken, k: str):
    for i in range(20):
        await a.setex("test", 5, {'value': f'{k}->{i}'})
        await asyncio.sleep(0.01)


# 示例使用
async def main():
    asyncio.create_task(p(LocalCacheUserToken))
    asyncio.create_task(write(LocalCacheUserToken, k='test1'))
    asyncio.create_task(write(LocalCacheUserToken, k='test2'))

    await asyncio.sleep(10)


asyncio.run(main())

outputs:

[2024-06-28 01:42:35.054703] await a.get('test')=None
[2024-06-28 01:42:35.054703] value={'value': 'test1->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->0'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test1->1'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->1'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->0'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->2'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->2'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->1'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->3'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->3'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->2'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->4'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->4'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->3'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->5'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->5'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->4'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->6'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->6'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->5'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->7'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->7'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->6'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->8'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->8'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->7'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->9'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->9'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->8'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->10'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->10'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->9'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->11'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->11'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->10'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->12'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->12'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->11'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->13'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->13'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->12'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->14'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->14'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->13'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->15'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->15'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->14'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->16'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->16'} get lock
[2024-06-28 01:42:35.054703] Deletion task for key='test', value={'value': 'test2->15'} was cancelled
[2024-06-28 01:42:35.054703] value={'value': 'test1->17'} get lock
[2024-06-28 01:42:35.054703] value={'value': 'test2->17'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->16'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->18'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->18'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->17'} was cancelled
[2024-06-28 01:42:35.070327] value={'value': 'test1->19'} get lock
[2024-06-28 01:42:35.070327] value={'value': 'test2->19'} get lock
[2024-06-28 01:42:35.070327] Deletion task for key='test', value={'value': 'test2->18'} was cancelled
[2024-06-28 01:42:36.056942] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:37.063445] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:38.064713] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:39.069732] await a.get('test')={'value': 'test2->19'}
[2024-06-28 01:42:40.077492] await a.get('test')=None
[2024-06-28 01:42:41.087926] await a.get('test')=None
[2024-06-28 01:42:42.103530] await a.get('test')=None
[2024-06-28 01:42:43.107391] await a.get('test')=None
[2024-06-28 01:42:44.108523] await a.get('test')=None

What is reason of this problem and how can i fix it?


Solution

  • The problem is that the write(LocalCacheUserToken, k='test2') is cancelling the task from write(LocalCacheUserToken, k='test1') in the same iteration of the event loop. This means that cls._delay_delete('test1', *, *) does not get executed because the task that it was scheduled in has already been cancelled. Because cls._delay_delete(...) isn't executed, neither is the print statement print(f"[{datetime.now()}] Deletion task for {key=}, {value=} was cancelled"). To allow the print statement to be executed even when cls._delay_delete(...) is cancelled you can add a "done" callback to the task and remove the try/except from cls._delay_delete(...), as follows:

    ...
        @classmethod
        async def setex(cls, key: str, time: int, value: dict):
            async with cls._write_lock:
                async with cls.acquire_lock(key):
                    print(f"[{datetime.now()}] {value=} get lock")
                    if key in cls._tasks:
                        cls._tasks[key].cancel()
    
                    cls._g[key] = value
    
                def on_delay_delete_done(task):
                    if task.cancelled():
                        print(f"[{datetime.now()}] Deletion task "
                              f"for {key=}, {value=} was cancelled")
    
                task = asyncio.create_task(cls._delay_delete(key, time))
                task.add_done_callback(on_delay_delete_done)
                cls._tasks[key] = task
    
        @classmethod
        async def _delay_delete(cls, key, time):
            await asyncio.sleep(time)
            async with cls.acquire_lock(key):
                del cls._g[key]
            cls._tasks.pop(key, None)
    ...
    

    Now we get the intended output:

    [2024-06-28 00:47:22.088757] await a.get('test')=None
    [2024-06-28 00:47:22.088836] value={'value': 'test1->0'} get lock
    [2024-06-28 00:47:22.088887] value={'value': 'test2->0'} get lock
    [2024-06-28 00:47:22.088952] Deletion task for key='test', value={'value': 'test1->0'} was cancelled
    [2024-06-28 00:47:22.099279] value={'value': 'test1->1'} get lock
    [2024-06-28 00:47:22.099466] value={'value': 'test2->1'} get lock
    [2024-06-28 00:47:22.099549] Deletion task for key='test', value={'value': 'test2->0'} was cancelled
    [2024-06-28 00:47:22.099572] Deletion task for key='test', value={'value': 'test1->1'} was cancelled
    [2024-06-28 00:47:22.109923] value={'value': 'test1->2'} get lock
    [2024-06-28 00:47:22.110013] value={'value': 'test2->2'} get lock
    [2024-06-28 00:47:22.110125] Deletion task for key='test', value={'value': 'test2->1'} was cancelled
    [2024-06-28 00:47:22.110163] Deletion task for key='test', value={'value': 'test1->2'} was cancelled
    [2024-06-28 00:47:22.120440] value={'value': 'test1->3'} get lock
    [2024-06-28 00:47:22.120602] value={'value': 'test2->3'} get lock
    [2024-06-28 00:47:22.120685] Deletion task for key='test', value={'value': 'test2->2'} was cancelled
    [2024-06-28 00:47:22.120709] Deletion task for key='test', value={'value': 'test1->3'} was cancelled
    [2024-06-28 00:47:22.130982] value={'value': 'test1->4'} get lock
    [2024-06-28 00:47:22.131126] value={'value': 'test2->4'} get lock
    [2024-06-28 00:47:22.131210] Deletion task for key='test', value={'value': 'test2->3'} was cancelled
    

    A general word of advice would be to try to make this code simpler before proceeding because it was pretty hard to reason about. It's also possible that it needs to be this complicated. Good luck if that's the case :)