pythonmultiprocessingpython-asyncio

How to simultaneously run multiple instances of a Class which have async functions?


class calculation():
    def __init__(self, value):
        self.value = value

    async def one(self):
        while True:
            print(self.value * 1)
            await asyncio.sleep(1)

    async def two(self):
        while True:
            print(self.value * 2)
            await asyncio.sleep(1)

    async def starter(self):
        await asyncio.gather(self.one(), self.two())
           

if __name__ == '__main__':
    
    a = calculation(2)
    b = calculation(3)
   
    p1 = multiprocessing.Process(target=asyncio.run(a.starter()))
    p2 = multiprocessing.Process(target=asyncio.run(b.starter()))
    p1.start()
    p2.start()

I tried to run the above code, but only first instance (a) runs, (b) is blocked from running, is it possible to run the both instances simultaneously, Thanks.


Solution

  • There is no need to be using threads or child processes based on what one and two are doing:

    import asyncio
    import time
    
    class Calculation():
        def __init__(self, value):
            self.value = value
    
        async def one(self):
            for _ in range(3):  # So that we end eventually
                print(f'{self.value} * 1 = {self.value * 1} (time = {time.monotonic() - start_time})')
                await asyncio.sleep(1)
    
        async def two(self):
            for _ in range(3):  # So that we end eventually
                print(f'{self.value} * 2 = {self.value * 2}  (time = {time.monotonic() - start_time})')
                await asyncio.sleep(1)
    
        async def starter(self):
            await asyncio.gather(self.one(), self.two())
    
    
    async def run_calculations():
        global start_time
    
        c2 = Calculation(2)
        c3 = Calculation(3)
        start_time = time.monotonic()
        await asyncio.gather(c2.starter(), c3.starter())
    
    if __name__ == '__main__':
        asyncio.run(run_calculations())
    

    Prints:

    2 * 1 = 2 (time = 0.0)
    2 * 2 = 4  (time = 0.0)
    3 * 1 = 3 (time = 0.0)
    3 * 2 = 6  (time = 0.0)
    2 * 1 = 2 (time = 1.0159999998286366)
    3 * 1 = 3 (time = 1.0159999998286366)
    2 * 2 = 4  (time = 1.0159999998286366)
    3 * 2 = 6  (time = 1.0159999998286366)
    2 * 1 = 2 (time = 2.030999999959022)
    2 * 2 = 4  (time = 2.030999999959022)
    3 * 1 = 3 (time = 2.030999999959022)
    3 * 2 = 6  (time = 2.030999999959022)
    

    Update for Multiprocessing

    If you really need to use multiprocessing because your asyncio coroutines one and two have some CPU-intensive calculations, I would suggest that you put those calculations into a "regular" method (i.e. not a coroutine), called blocking_code in this demo, and use loop method run_in_executor as follows:

    import asyncio
    import concurrent.futures
    import time
    
    class Calculation():
        def __init__(self, value):
            self.value = value
            self.start_time = time.monotonic()
    
        def blocking_code(self, multiplier):
            print(f'{self.value} * {multiplier} = {self.value * multiplier} (time = {time.monotonic() - self.start_time})')
    
        async def one(self, pool):
            loop = asyncio.get_running_loop()
            for _ in range(3):  # So that we end eventually
                await loop.run_in_executor(pool, self.blocking_code, 1)
                await asyncio.sleep(1)
    
        async def two(self, pool):
            loop = asyncio.get_running_loop()
            for _ in range(3):  # So that we end eventually
                await loop.run_in_executor(pool, self.blocking_code, 2)
                await asyncio.sleep(1)
    
        async def starter(self, pool):
            await asyncio.gather(self.one(pool), self.two(pool))
    
    
    async def run_calculations():
        c2 = Calculation(2)
        c3 = Calculation(3)
        with concurrent.futures.ProcessPoolExecutor(4) as pool:
            await asyncio.gather(c2.starter(pool), c3.starter(pool))
    
    if __name__ == '__main__':
        asyncio.run(run_calculations())
    

    Prints:

    2 * 1 = 2 (time = 0.2190000000409782)
    2 * 2 = 4 (time = 0.2190000000409782)
    3 * 1 = 3 (time = 0.2190000000409782)
    3 * 2 = 6 (time = 0.2190000000409782)
    2 * 1 = 2 (time = 1.2190000000409782)
    3 * 1 = 3 (time = 1.2190000000409782)
    3 * 2 = 6 (time = 1.2350000003352761)
    2 * 2 = 4 (time = 1.2350000003352761)
    2 * 1 = 2 (time = 2.25)
    3 * 1 = 3 (time = 2.25)
    2 * 2 = 4 (time = 2.25)
    3 * 2 = 6 (time = 2.25)
    

    If blocking_code is not particularly CPU-intensive (for example, it it retrieves a URL using the requests module) but does not use asyncio, then you can initialize pool to a concureent.futures.ThreadPoolExecutor instance.