node.jsmongodbmongodb-nodejs-driver

MongoDB watch randomly crashes (Cursor session id is not the same as the operation context's session id)


I am connecting to my MongoDB with the official node driver.

I am trying to watch changes, both on a specific collection and a specific document. Both ways are experiencing this error.

My code works perfectly fine most of the time, but sometimes while it's idle it randomly crashes my whole app. I would like to understand why this is happening, and catch the error so it doesn't break my whole app. Unfortunately, it never points to a specific line in my code.

Here is one of my codes (simplified):

let watcher = q.watch({ids: [documentId]}).on("change", (changeEvent) => { 
    //only listen to update operations
    if (changeEvent.operationType !== 'update') return;
    console.log('document was updated');
    watcher.cursor.close();
});

Which gave me this error:

node:events:371
      throw er; // Unhandled 'error' event
      ^

MongoServerError: Cursor session id (8c01dfe6-6a3e-479c-98ce-243d61ec24e4 - 6tctnoM1KQxUOgLaEIm7HYAvVSF3YGlrCMxn+SswizE=) is not the same as the operation context's session id (none)
    at MessageStream.messageHandler (C:\project\node_modules\mongodb\lib\cmap\connection.js:463:30)
    at MessageStream.emit (node:events:394:28)
    at processIncomingData (C:\project\node_modules\mongodb\lib\cmap\message_stream.js:108:16)
    at MessageStream._write (C:\project\node_modules\mongodb\lib\cmap\message_stream.js:28:9)
    at writeOrBuffer (node:internal/streams/writable:389:12)
    at _write (node:internal/streams/writable:330:10)
    at MessageStream.Writable.write (node:internal/streams/writable:334:10)
    at TLSSocket.ondata (node:internal/streams/readable:754:22)
    at TLSSocket.emit (node:events:394:28)
    at addChunk (node:internal/streams/readable:315:12)
Emitted 'error' event on ChangeStream instance at:
    at closeWithError (C:\project\node_modules\mongodb\lib\change_stream.js:355:22)
    at processError (C:\project\node_modules\mongodb\lib\change_stream.js:445:12)
    at Readable.<anonymous> (C:\project\node_modules\mongodb\lib\change_stream.js:364:33)
    at Readable.emit (node:events:394:28)
    at emitErrorNT (node:internal/streams/destroy:157:8)
    at emitErrorCloseNT (node:internal/streams/destroy:122:3)
    at processTicksAndRejections (node:internal/process/task_queues:83:21) {
  operationTime: Timestamp { low: 1, high: 1629924381, unsigned: true },
  ok: 0,
  code: 13,
  codeName: 'Unauthorized',
  '$clusterTime': {
    clusterTime: Timestamp { low: 1, high: 1629924381, unsigned: true },
    signature: {
      hash: Binary {
        sub_type: 0,
        buffer: Buffer(20) [Uint8Array] [
          104,  40, 136,  32, 223, 251,
          247,  64,  47, 185, 197, 229,
          247,  96, 219, 103, 110, 116,
           65, 183
        ],
        position: 20
      },
      keyId: Long { low: 5, high: 1629063232, unsigned: false }
    }
  }
}

I also have the other code, which occasionally crashes the same way:

queueCollection.watch([{ $match: { operationType: "insert" }}])
    .on("change", (changeEvent) => { 
        console.debug('document added',currentlyProcessing?'currentlyProcessing':'notCurrentlyProcessing');
        if (!currentlyProcessing) processNextQueueItem();
    });

If anyone could explain why this error is happening, or how to catch and fix the watcher when it does, that would be great.


Solution

  • I still don't know how to prevent it, but here's how to hack your way around it:

    //watches the collection for any inserted documents
    function startWatcher (collection) {
        console.log('starting watcher');
        let watcher = collection.watch([{ $match: { operationType: "insert" }}])
            .on("change", (changeEvent) => {
                /*whatever you want to do when event is fired*/
            })
            //changestream connection died (cursor session id is not the same as the operation contexts session id)
            .on('error', e => {
                console.error('watcher died');
    
                //close current connection
                watcher.cursor.close();
    
                //restart new watcher
                startWatcher(collection);
            });
    }
    
    startWatcher(db.collection('collectionname'));