I have code similar to the following:
let stream_pipeline = [
doc! {
"$changeStream" : {
"fullDocument": "updateLookup"
}
},
doc! {
"$match": {
"operationType": {
"$in": ["insert", "update", "replace", "delete"]
}
}
},
];
let stream = collection.watch(stream_pipeline, None).await?;
I'm getting the following error:
$_internalChangeStreamOplogMatch
is only valid as the first stage in a pipeline (code 40602)
If I change the order of the two documents in the pipeline, I get the same error message. In fact, it looks like I get the same error whatever I do, as long as the $changeStream
document is part of the pipeline, even if it's the only component of the pipeline.
What I want to do is match only inserts, updates, replaces and deletes, and to obtain a full document on update. If I can't $match
the operation type, then I'd like to at least get a full document on update.
What am I doing wrong?
I tried both the Rust code above and a similar Node.js implementation, and I get the same error with both drivers.
The {"fullDocument": "updateLookup"}
is not part of the pipeline, but an option - the second parameter of the watch
. At least in nodejs. In Rust must be something like this:
let stream_pipeline = [
doc! {
"$match": {
"operationType": {
"$in": ["insert", "update", "replace", "delete"]
}
}
},
];
let options = ChangeStreamOptions::builder()
.full_document(Some(FullDocumentType::UpdateLookup))
.build();
let stream = collection.watch(stream_pipeline, options).await?;