mongodbchangestream

MongoDB change stream $_internalChangeStreamOplogMatch in the wrong stage


I have code similar to the following:

let stream_pipeline = [
    doc! {
        "$changeStream" : {
            "fullDocument": "updateLookup"
        }
    },
    doc! {
        "$match": {
            "operationType": {
                "$in": ["insert", "update", "replace", "delete"]
            }
        }
    },
];

let stream = collection.watch(stream_pipeline, None).await?;

I'm getting the following error:

$_internalChangeStreamOplogMatch is only valid as the first stage in a pipeline (code 40602)

If I change the order of the two documents in the pipeline, I get the same error message. In fact, it looks like I get the same error whatever I do, as long as the $changeStream document is part of the pipeline, even if it's the only component of the pipeline.

What I want to do is match only inserts, updates, replaces and deletes, and to obtain a full document on update. If I can't $match the operation type, then I'd like to at least get a full document on update.

What am I doing wrong?


I tried both the Rust code above and a similar Node.js implementation, and I get the same error with both drivers.


Solution

  • The {"fullDocument": "updateLookup"} is not part of the pipeline, but an option - the second parameter of the watch. At least in nodejs. In Rust must be something like this:

    let stream_pipeline = [
        doc! {
            "$match": {
                "operationType": {
                    "$in": ["insert", "update", "replace", "delete"]
                }
            }
        },
    ];
    
    let options = ChangeStreamOptions::builder()
                    .full_document(Some(FullDocumentType::UpdateLookup))
                    .build();
    
    let stream = collection.watch(stream_pipeline, options).await?;