I am trying to implement a structure that keeps itself up-to-date by using find and watch combination. E.g. for users collection we find all users and store them in a hashmap. After that we open watch stream and update our hashmap according to incoming events. But there is problem - if change occurs between reading users and opening a stream we might end up desynchronized.
My idea was to use startAtOperationTime option but we can't reliably know the timestamp we need. Also I don't see a way to do it with transactions.
The question is - how do we open change stream exactly after read operation and don't lose any data.
Great question
IMHO, a watch with delta changes emitted is pretty useless without knowing what initial state those deltas should apply to, so of course you need to do a find too. AFAIK there is no atomic "find and watch" command for mongo.
Even removing the deltas from the problem and only using the fullDocument=updateLookup
option, there is still the problem of synchronisation. Consider this approach:
We need to setup the watch first so we catch anything that happens while we are finding. On first glance, this looks good. If something happened while you were doing the find, and there are some change events in the stream, these will get processed when you resume. Since you started the watch before the find, the worse case scenario is that the change stream contains some changes that occurred before the find and can cause saving old state. This normally won't matter since the change stream will also contain the most recent update too and eventually save the correct state.
But did the watch actually run on the mongo cluster before the find?
And did the find query even go to a node in the cluster that is properly synchronised?
Due to the concepts of unifiedTopology and connection pooling, I don't think we can possibly know the answers to these questions. We don't know which connection the above lines of commands will ultimately use. If mongo is offline, the client will buffer commands like watch and find then release them when a connection becomes available to the pool. But since there are multiple connections in play (which may choke at just the wrong time), there is no guarantee that the watch command in line 1 will arrive at mongo before the find command in line 3. So I'm pretty sure this answer will work 99% of the time, but it isn't 100% reliable.
You could attempt to confirm the changeStream is connected by checking the resumeTokenChanged
event, then do the find, but if your cluster is having issues where some nodes oplogs are taking a while to sync, you still might be able to "find()" old data and watch too late. This means missed updates.
Update
I think the best solution we can hope for is to do a combination of everything above plus add a delay based on your desired grace period for cluster synchronisation.
collection.watch()
resumeTokenChanged
event. This also means we are connected.resumeTokenChanged
in time. This means our data could be stale too.changeStream.pause()
change
events from the changeStream then, await collection.find().toArray()
changeStream.resume()
error
event, start the whole process again.