pythonwebsocketpython-asyncio

Passing websocket data to another python program


I have a fairly typical websocket-client program that reads from the wss server.

import asyncio
import json
import websockets

api_data = {
        "type": "subscribe",
        "channel": "v4_orderbook",
        "id": 'BTC-USD',
}

async def wsrun(uri):
        async for websocket in websockets.connect(uri):
                await websocket.send(json.dumps(api_data))
                while True:
                        print(await websocket.recv())

asyncio.get_event_loop().run_until_complete(wsrun('wss://indexer.v4testnet.dydx.exchange/v4/ws'))

The above program as it is written above just prints all the data to the screen, but what I need to do is pass the data to another python program asynchronously. So that means i have to replace the print() statement but I have no idea what to do.

The reason I have to do that is because the second program may need to take time to process the data while the first program (that reads the websocket) can't be delayed at all and must keep running and not lose any messages. Given a sufficiently large buffer so that the first program can keep pumping data into the buffer without waiting for the second program.


Solution

  • I am working on the assumption that it might be advantageous, depending on what client.py needs to do to process a message, to have multiple processes processing these messages in parallel. If that is not the case, then in server.py you can set the n_processes variable to 1.

    client.py

    We create a function process_message that processes a single message:

    """client.py"""
    
    def process_message(message):
        """Do something with message."""
    
        ...
    

    server.py

    We create a multiprocessing pool with N processes (N might be 1 as previously described) that can process N messages in parallel to improve performance. Then for each input message we submit a new task to the processing pool:

    """server.py"""
    import asyncio
    import json
    import websockets
    from multiprocessing import Pool, cpu_count
    
    from client import process_message
    
    api_data = {
        "type": "subscribe",
        "channel": "v4_orderbook",
        "id": 'BTC-USD',
    }
    
    async def wsrun(uri, pool):
        async for websocket in websockets.connect(uri):
            await websocket.send(json.dumps(api_data))
            while True:
                message = await websocket.recv()
                # Submit a new task to the multiprocessing pool:
                pool.apply_async(process_message, args=(message,))
    
    def main():
        # Create a process pool:
        n_processes = cpu_count() - 1  # Leave a core for us
        pool = Pool(n_processes)
    
        while True:
            try:
                uri = some_uri
                asyncio.run(wsrun(uri, pool))
            except Exception as error:
                pass  # Ignore exception (or log it)
            else:
                # If wsrun ever returns, then wait for all submitted
                # tasks (i.e. messages to process) to complete:
                pool.close()
                pool.join()
                break
    
    if __name__ == '__main__':  # Required by Windows, for example
        main()
    

    Update Using asyncio.loop.run_in_executor

    server.py

    """server.py"""
    import asyncio
    import json
    import websockets
    from os import cpu_count
    from concurrent.futures import ProcessPoolExecutor
    
    from client import process_message
    
    api_data = {
        "type": "subscribe",
        "channel": "v4_orderbook",
        "id": 'BTC-USD',
    }
    
    async def wsrun(uri, pool):
        loop = asyncio.get_running_loop()
    
        async for websocket in websockets.connect(uri):
            await websocket.send(json.dumps(api_data))
            while True:
                message = await websocket.recv()
                # Submit a new task to the multiprocessing pool:
                loop.run_in_executor(pool, process_message, message)
    
    def main():
        # Create a process pool:
        n_processes = cpu_count() - 1  # Leave a core for us
        pool = ProcessPoolExecutor(n_processes)
    
        while True:
            try:
                uri = some_uri
                asyncio.run(wsrun(uri, pool))
            except Exception as error:
                pass  # Ignore exception (or log it)
            else:
                # If wsrun ever returns, then wait for all submitted
                # tasks (i.e. messages to process) to complete:
                pool.shutdown()
                break
    
    if __name__ == '__main__':  # Required by Windows, for example
        main()