I have been creating some websocket method on my backend that will use ChangeStream
to track data changes in my MongoDb collection. I have this code with a simple pipeline that tries to find records with a certain value in the author.id
field:
async fn websocket(socket: WebSocket, state: State<AppState>, user_id: ObjectId) {
let (mut sender, _receiver) = socket.split();
let pipeline = vec![doc! {
"$match": {
"author.id": user_id
}
}];
let change_stream = state
.db
.posts_collection
.watch(pipeline, None)
.await
.map_err(|err| {
eprintln!("Error creating change stream: {:?}", err);
"Failed to create change stream".to_string()
});
It does not track changes but if i make my pipeline None
it starts to track any changes on the collection, so how to create ChangeStream
that gonna track changes only on my matching records?
To track operations on the collection records(insert, update, delete) for specific fields, u have to make adjustments on pipeline and add options:
let pipeline = vec![doc! {
"$match": {
"$or": [
// insert
{"fullDocument.author.id": user_id},
// delete
{"fullDocumentBeforeChange.author.id": user_id},
// update
{"updateDescription.updatedFields.author.id": user_id},
]
}
}];
let options = ChangeStreamOptions::builder()
.full_document(Some(FullDocumentType::UpdateLookup))
// full_document_before_change returns pre-image of record(record data before update, by update in my case i mean deletion of record)
// to use full_document_before_change make sure to enable pre-image option when creating colleciton in your code
.full_document_before_change(Some(
mongodb::options::FullDocumentBeforeChangeType::WhenAvailable,
))
.build();
Example of enabling pre-image and post-image options on collection(works only MongoDb v6.0 or later):
let enable = ChangeStreamPreAndPostImages::builder()
.enabled(true)
.build();
let opts = CreateCollectionOptions::builder()
.change_stream_pre_and_post_images(enable)
.build();
let create_posts_collection = database
.create_collection(&posts_collection_name, opts)
.await;
create_posts_collection.map_err(|err| {
eprintln!(
"Error creating posts collection with change stream options: {}",
err
);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(OperationStatusResponse {
success: false,
error_message: Some(format!(
"Failed to create MongoDB collection: {}",
err.to_string()
)),
}),
)
})?;
let posts_collection = database.collection::<Post>(&posts_collection_name);