Recently I've gotten into the "crypto mania" and have started writing my own wrappers around the API's on some exchanges.
Binance in particular has an a streaming websocket endpoint.
where you can stream data but via a websocket endpoint. I thought I'd try this out on my own using sanic.
here is my websocket route
@ws_routes.websocket("/hello")
async def hello(request, ws):
while True:
await ws.send("hello")
now I have 2 clients on 2 different machines connecting to it
async def main():
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect("ws://192.168.86.31:8000/hello")
while True:
data = await ws.receive()
print(data)
however only one of the clients will be able to connect and receive the sent data from the server. I'm assuming that because of the while
loop its blocking and preventing the other connection from connecting because it doesn't yield
?
how do we make it stream to multiple clients without blocking the other connections?
I looked into adding more workers and it seems to do the trick but what I don't understand is thats not a very scalable solution. because each client would be its own worker and if you have thousands or even just 10 clients that would be 10 workers 1 per client.
so how does Binance do their websocket streaming? or hell how does the twitter stream endpoint work?
how is it able to serve an infinite stream to multiple concurrent clients? because ultimately thats what I'm trying to do
The way to solve this would be something like this.
I am using the sanic
framework
class Stream:
def __init__(self):
self._connected_clients = set()
async def __call__(self, *args, **kwargs):
await self.stream(*args, **kwargs)
async def stream(self, request, ws):
self._connected_clients.add(ws)
while True:
disconnected_clients = []
for client in self._connected_clients: # check for disconnected clients
if client.state == 3: # append to a list because error will be raised if removed from set while iterating over it
disconnected_clients.append(client)
for client in disconnected_clients: # remove disconnected clients
self._connected_clients.remove(client)
await asyncio.wait([client.send("Hello") for client in self._connected_clients]))
ws_routes.add_websocket_route(Stream(), "/stream")
websocket
sessionlist
or set
websocket
sessions and remove from your websocket
sessions containerawait asyncio.wait([ws_session.send() for ws_session [list of valid sessions]])
which is basically a broadcast.5.profit!
this is basically the pubsub design pattern