pythonpython-3.xasynchronousasync-awaitpython-asyncio

How to write your own async-awaitable coroutine function in Python?


I'm trying to write my own awaiatbale function which could be used in an asyncio loop like asyncio.sleep() method or something like these pre-awaitable implemented methods.

Here is what I've done so far:

import asyncio

def coro1():
    for i in range(1, 10):
        yield i

def coro2():
    for i in range(1, 10):
        yield i*10

class Coro:  # Not used.
    def __await__(self):
        for i in range(1, 10):
            yield i * 100

@asyncio.coroutine
def wrapper1():
    return (yield from coro1())

@asyncio.coroutine
def wrapper2():
    return (yield from coro2())

for i in wrapper1():
    print(i)

print("Above result was obvious which I can iterate around a couroutine.".center(80, "#"))

async def async_wrapper():
    await wrapper1()
    await wrapper2()

asyncio.run(async_wrapper())

What I got as a result:

1
2
3
4
5
6
7
8
9
#######Above result was obvious which I can iterate around a couroutine.#########
Traceback (most recent call last):
  File "stack-coroutine.py", line 36, in <module>
    result = loop.run_until_complete(asyncio.gather(*futures))
  File "/usr/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "stack-coroutine.py", line 30, in async_wrapper
    await wrapper1()
  File "stack-coroutine.py", line 18, in wrapper1
    return (yield from coro1())
  File "stack-coroutine.py", line 5, in coro1
    yield i
RuntimeError: Task got bad yield: 1

What I expect as a result:

1
10
2
20
3
30
.
.
.

[NOTE]:


Solution

  • I found a concurrent/asynchronous approach using generators. However, it's not an asyncio approach:

    from collections import deque
    
    def coro1():
        for i in range(1, 5):
            yield i
    
    def coro2():
        for i in range(1, 5):
            yield i*10
    
    print('Async behaviour using default list with O(n)'.center(60, '#'))
    tasks = list()
    tasks.extend([coro1(), coro2()])
    
    while tasks:
        task = tasks.pop(0)
        try:
            print(next(task))
            tasks.append(task)
        except StopIteration:
            pass
    
    print('Async behaviour using deque with O(1)'.center(60, '#'))
    tasks = deque()
    tasks.extend([coro1(), coro2()])
    
    while tasks:
        task = tasks.popleft()  # select and remove a task (coro1/coro2).
        try:
            print(next(task))
            tasks.append(task)  # add the removed task (coro1/coro2) for permutation.
        except StopIteration:
            pass
    

    Out:

    ########Async behaviour using default list with O(n)########
    1
    10
    2
    20
    3
    30
    4
    40
    ###########Async behaviour using deque with O(1)############
    1
    10
    2
    20
    3
    30
    4
    40
    

    [UPDATE]:

    Finally, I've solved this through asyncio syntax:

    import asyncio
    
    async def coro1():
        for i in range(1, 6):
            print(i)
            await asyncio.sleep(0)  # switches task every one iteration.
    
    async def coro2():
        for i in range(1, 6):
            print(i * 10)
            await asyncio.sleep(0)  # switches task every one iteration.
    
    async def main():
        futures = [
            asyncio.create_task(coro1()),
            asyncio.create_task(coro2())
        ]
        await asyncio.gather(*futures)
    
    asyncio.run(main())
    

    Out:

    1
    10
    2
    20
    3
    30
    4
    40
    5
    50
    

    And another concurrency coroutine approach via async-await expression and an event-loop manager based on Heap queue algorithm, without using asyncio library and its event-loop as well as without using asyncio.sleep() method:

    import heapq
    from time import sleep
    from datetime import datetime, timedelta
    
    class Sleep:
        def __init__(self, seconds):
            self.sleep_until = datetime.now() + timedelta(seconds=seconds)
    
        def __await__(self):
            yield self.sleep_until
    
    async def coro1():
        for i in range(1, 6):
            await Sleep(0)
            print(i)
    
    async def coro2():
        for i in range(1, 6):
            await Sleep(0)
            print(i * 10)
    
    def coro_manager(*coros):
        coros = [(datetime.now(), coro) for coro in coros]
        heapq.heapify(coros)
        while coros:
            exec_at, coro = heapq.heappop(coros)
            if exec_at > datetime.now():
                sleep((exec_at - datetime.now()).total_seconds())
            try:
                heapq.heappush(coros, (coro.send(None), coro))
            except StopIteration:
                try:
                    coros.remove(coro)
                except ValueError:
                    pass
    
    coro_manager(coro1(), coro2())
    

    Out:

    1
    10
    2
    20
    3
    30
    4
    40
    5
    50