databasemongodbstructurereal-time-updateschangestream

Mongodb find and watch


I am trying to implement a structure that keeps itself up-to-date by using find and watch combination. E.g. for users collection we find all users and store them in a hashmap. After that we open watch stream and update our hashmap according to incoming events. But there is problem - if change occurs between reading users and opening a stream we might end up desynchronized.

My idea was to use startAtOperationTime option but we can't reliably know the timestamp we need. Also I don't see a way to do it with transactions.

The question is - how do we open change stream exactly after read operation and don't lose any data.


Solution

  • Great question

    IMHO, a watch with delta changes emitted is pretty useless without knowing what initial state those deltas should apply to, so of course you need to do a find too. AFAIK there is no atomic "find and watch" command for mongo.

    Even removing the deltas from the problem and only using the fullDocument=updateLookup option, there is still the problem of synchronisation. Consider this approach:

    1. collection.watch()
    2. changeStream.pause()
    3. await collection.find().toArray()
    4. changeStream.resume()

    We need to setup the watch first so we catch anything that happens while we are finding. On first glance, this looks good. If something happened while you were doing the find, and there are some change events in the stream, these will get processed when you resume. Since you started the watch before the find, the worse case scenario is that the change stream contains some changes that occurred before the find and can cause saving old state. This normally won't matter since the change stream will also contain the most recent update too and eventually save the correct state.

    But did the watch actually run on the mongo cluster before the find?

    And did the find query even go to a node in the cluster that is properly synchronised?

    Due to the concepts of unifiedTopology and connection pooling, I don't think we can possibly know the answers to these questions. We don't know which connection the above lines of commands will ultimately use. If mongo is offline, the client will buffer commands like watch and find then release them when a connection becomes available to the pool. But since there are multiple connections in play (which may choke at just the wrong time), there is no guarantee that the watch command in line 1 will arrive at mongo before the find command in line 3. So I'm pretty sure this answer will work 99% of the time, but it isn't 100% reliable.

    You could attempt to confirm the changeStream is connected by checking the resumeTokenChanged event, then do the find, but if your cluster is having issues where some nodes oplogs are taking a while to sync, you still might be able to "find()" old data and watch too late. This means missed updates.

    Update

    I think the best solution we can hope for is to do a combination of everything above plus add a delay based on your desired grace period for cluster synchronisation.

    1. collection.watch()
    2. Wait until changeStream is fresh by checking the resumeTokenChanged event. This also means we are connected.
    3. Setup a timeout to consider the changeStream stale if we don't receive resumeTokenChanged in time. This means our data could be stale too.
    4. Wait grace period for cluster node synchronisation
    5. changeStream.pause()
    6. If we still haven't received any change events from the changeStream then, await collection.find().toArray()
    7. changeStream.resume()
    8. If the changeStream emits the error event, start the whole process again.