I have been working on an websocket connection for some time and encountered large latency and overflow problems with my processing.
My current architecture is 2 processes:
Main problem is that I still dont fulfil all the given rules for updating an local order book:
How to manage a local order book correctly
- Open a stream to wss://stream.binance.com:9443/ws/bnbbtc@depth.
- Buffer the events you receive from the stream.
- Get a depth snapshot from https://api.binance.com/api/v3/depth?symbol=BNBBTC&limit=1000 .
- Drop any event where u is <= lastUpdateId in the snapshot.
- The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1.
- While listening to the stream, each new event's U should be equal to the previous event's u+1. ...
Condition number 5 is never met. We delete unnecessary updates but the condition is never met and also it seems that we create "gaps" in the update ID chains to when entering the orderBookUpdate loop we get some warnings that update entries are missing. For about 10 entries (the ones pushed back after initial deletions).
INFO:root:Websocket connection established
INFO:root:Initiated trades: {'_id': 1, 'trades': []}
INFO:root:Deleted 6 updates from depth bufferINFO:root:Order book initialization updateId not stable
INFO:root:Initiated orderbook: ...
INFO:root:Missing depth update id. Current id: 47878509997, next id: 47878507790
INFO:root:Missing depth update id. Current id: 47878507790, next id: 47878507809
INFO:root:Missing depth update id. Current id: 47878507809, next id: 47878507822
INFO:root:Missing depth update id. Current id: 47878507822, next id: 47878507832
INFO:root:Missing depth update id. Current id: 47878507832, next id: 47878507839
INFO:root:Missing depth update id. Current id: 47878507839, next id: 47878507847
And here the setup and processing of order book and updates in process 2. DataProcessor:
def initiateOrderBook(self):
# check if we already have updates in database
while True:
# delete old updates
updates = self.collection.find_one_and_update(
filter={"_id": 4},
update={
"$set": {"depthUpdates": []} # Reset depthUpdates to empty dict
},
projection={"depthUpdates": 1}, # Only return the pulled updates
return_document=pymongo.ReturnDocument.BEFORE
)
if updates is None:
logging.info('Updates not ready in database trying again...')
time.sleep(2)
else:
break
# order updates
updates = SortedDict(list((x['u'], x) for x in updates['depthUpdates'])) # order updates
# get orderbook snapshot
orderBook = self.getOrderbook()
# logic filter not needed updates
c = 0
for last_update_id, data in updates.items():
if last_update_id <= orderBook['lastUpdateId']:
updates.pop(last_update_id)
c += 1
logging.info(f'Deleted {c} updates from depth buffer')
# check if orderbook and updates are in order
first_item = updates.peekitem()[1]
if first_item['U'] <= orderBook['lastUpdateId'] + 1 <= first_item['u']:
logging.info(f"Order book is ready to update")
else:
logging.info(f"Order book initialization updateId not stable")
# push rest of updates back into db
filter_criteria = {"_id": 4}
update = {
"$push": {"depthUpdates": {"$each": list(updates.values())}}
}
self.collection.update_one(filter_criteria, update)
# set order book in database
document = self.collection.find_one_and_update(
filter={"_id": 2},
update={
"$set": {"orderBook": orderBook}
},
upsert=True,
return_document=pymongo.ReturnDocument.AFTER
)
logging.info(f'Initiated orderbook: {document}')
def orderBookUpdate(self, updatingSpeed):
while True:
# fetch
document = self.collection.find_one(filter={"_id": 2})
updates = self.collection.find_one_and_update(
filter={"_id": 4},
update={
"$set": {"depthUpdates": []} # Reset depthUpdates to empty dict
},
projection={"depthUpdates": 1}, # Only return the pulled updates
return_document=pymongo.ReturnDocument.BEFORE
)
# processing update
orderBook = document['orderBook']
updates = SortedDict(list((x['u'], x) for x in updates['depthUpdates'])) # order updates
# order book update logic
for entry in updates.values():
# check update chain integrety
if orderBook['lastUpdateId'] + 1 != entry['u']:
logging.info(f'Missing depth update id. Current id: {orderBook["lastUpdateId"]}, next id: {entry["u"]}')
for side in ['b', 'a']:
for price_level in entry[side]:
quantity = float(price_level[1])
if quantity == 0:
orderBook[side].pop(price_level[0], None)
else:
orderBook[side][price_level[0]] = quantity
orderBook['lastUpdateId'] = entry['u']
# Limit to the closest 1000 entries
orderBook['b'] = SortedDict(list(orderBook['b'].items())[-500:])
orderBook['a'] = SortedDict(list(orderBook['a'].items())[:500])
# Push the updated order book back to MongoDB
self.collection.update_one(filter={"_id": 2}, update={"$set": {"orderBook": orderBook}})
# updating speed
time.sleep(updatingSpeed)
Luckely nobody answered because the problems where just bad programming.
As to be expected editing an dict from inside an iteration of itself leads to unexpected behavior like wrong deletions in the updates. New approach:
# logic filter not needed updates
c = 0
toRemove = []
for last_update_id, data in updates.items():
if last_update_id <= orderBook['lastUpdateId']:
toRemove.append(last_update_id)
c += 1
# Remove the collected keys from the updates
for key in toRemove:
updates.pop(key)
logging.info(f'Deleted {c} updates from depth buffer')
Not a problem related but we also need to move the fetching of the order book into the loop with an small time delay so we guarantee that we have updates from bevor and after the fetch.
# check if we already have updates in database
while True:
# get orderbook snapshot
orderBook = self.getOrderbook()
# small delay to updates with u > then order book to collect
time.sleep(2)
...
Another thing is that .peekitem() works not as than expected. .peekitem(1) will deliver the first entry as wanted. Now all conditions are met.
Last thing in the UpdateOrderbook we need to convert the pulled bids and asks from the orderbook to SortedDicts bevor the slicing part. So we guarantee order bevor slicing.
# covert to ordered dicts
orderBook = document['orderBook']
orderBook['b'] = SortedDict(orderBook['b'])
orderBook['a'] = SortedDict(orderBook['a'])