I have a fairly typical websocket-client program that reads from the wss server.
import asyncio
import json
import websockets
api_data = {
"type": "subscribe",
"channel": "v4_orderbook",
"id": 'BTC-USD',
}
async def wsrun(uri):
async for websocket in websockets.connect(uri):
await websocket.send(json.dumps(api_data))
while True:
print(await websocket.recv())
asyncio.get_event_loop().run_until_complete(wsrun('wss://indexer.v4testnet.dydx.exchange/v4/ws'))
The above program as it is written above just prints all the data to the screen, but what I need to do is pass the data to another python program asynchronously. So that means i have to replace the print() statement but I have no idea what to do.
The reason I have to do that is because the second program may need to take time to process the data while the first program (that reads the websocket) can't be delayed at all and must keep running and not lose any messages. Given a sufficiently large buffer so that the first program can keep pumping data into the buffer without waiting for the second program.
I am working on the assumption that it might be advantageous, depending on what client.py needs to do to process a message, to have multiple processes processing these messages in parallel. If that is not the case, then in server.py you can set the n_processes
variable to 1.
We create a function process_message
that processes a single message:
"""client.py"""
def process_message(message):
"""Do something with message."""
...
We create a multiprocessing pool with N processes (N might be 1 as previously described) that can process N messages in parallel to improve performance. Then for each input message we submit a new task to the processing pool:
"""server.py"""
import asyncio
import json
import websockets
from multiprocessing import Pool, cpu_count
from client import process_message
api_data = {
"type": "subscribe",
"channel": "v4_orderbook",
"id": 'BTC-USD',
}
async def wsrun(uri, pool):
async for websocket in websockets.connect(uri):
await websocket.send(json.dumps(api_data))
while True:
message = await websocket.recv()
# Submit a new task to the multiprocessing pool:
pool.apply_async(process_message, args=(message,))
def main():
# Create a process pool:
n_processes = cpu_count() - 1 # Leave a core for us
pool = Pool(n_processes)
while True:
try:
uri = some_uri
asyncio.run(wsrun(uri, pool))
except Exception as error:
pass # Ignore exception (or log it)
else:
# If wsrun ever returns, then wait for all submitted
# tasks (i.e. messages to process) to complete:
pool.close()
pool.join()
break
if __name__ == '__main__': # Required by Windows, for example
main()
asyncio.loop.run_in_executor
"""server.py"""
import asyncio
import json
import websockets
from os import cpu_count
from concurrent.futures import ProcessPoolExecutor
from client import process_message
api_data = {
"type": "subscribe",
"channel": "v4_orderbook",
"id": 'BTC-USD',
}
async def wsrun(uri, pool):
loop = asyncio.get_running_loop()
async for websocket in websockets.connect(uri):
await websocket.send(json.dumps(api_data))
while True:
message = await websocket.recv()
# Submit a new task to the multiprocessing pool:
loop.run_in_executor(pool, process_message, message)
def main():
# Create a process pool:
n_processes = cpu_count() - 1 # Leave a core for us
pool = ProcessPoolExecutor(n_processes)
while True:
try:
uri = some_uri
asyncio.run(wsrun(uri, pool))
except Exception as error:
pass # Ignore exception (or log it)
else:
# If wsrun ever returns, then wait for all submitted
# tasks (i.e. messages to process) to complete:
pool.shutdown()
break
if __name__ == '__main__': # Required by Windows, for example
main()