mongodbspring-data-mongodbmongo-java-driverchangestream

Mongo change-Stream with Spring resumeAt vs startAfter and fault tolerance in case of connection loss


Can't find an answer on stackOverflow, nor in any documentation, I have the following change stream code(listen to a DB not a specific collection)

Mongo Version is 4.2

@Configuration
public class DatabaseChangeStreamListener {

//Constructor, fields etc...

    @PostConstruct
    public void initialize() {
        MessageListenerContainer container = new DefaultMessageListenerContainer(mongoTemplate, new SimpleAsyncTaskExecutor(), this::onException);
        ChangeStreamRequest.ChangeStreamRequestOptions options =
                new ChangeStreamRequest.ChangeStreamRequestOptions(mongoTemplate.getDb().getName(), null, buildChangeStreamOptions());
        container.register(new ChangeStreamRequest<>(this::onDatabaseChangedEvent, options), Document.class);
        container.start();
    }

    private ChangeStreamOptions buildChangeStreamOptions() {
        return ChangeStreamOptions.builder()
                .returnFullDocumentOnUpdate()
                .filter(newAggregation(match(where(OPERATION_TYPE).in(INSERT.getValue(), UPDATE.getValue(), REPLACE.getValue(), DELETE.getValue()))))
                .resumeAt(Instant.now().minusSeconds(1))
                .build();
    }
//more code
}

I want the stream to start listening from system initiation time only, without taking anything prior in the op-log, will .resumeAt(Instant.now().minusSeconds(1)) work? do I need to use starAfter method if so how can I found the latest resumeToken in the db? or is it ready out of the box and I don't need to add any resume/start lines?

second question, I never stop the container(it should always live while app is running), In case of disconnection from the mongoDB and reconnection will the listener in current configuration continue to consume messages? (I am having a hard time simulation DB disconnection)

If it will not resume handling events, what do I need to change in the configuration so that the change stream will continue and will take all the event from the last received resumeToken prior to the disconnection? I have read this great article on medium change stream in prodcution, but it uses the cursor directly, and I want to use the spring DefaultMessageListenerContainer, as it is much more elegant.


Solution

  • So I will answer my own(some more dumb, some less :)...) questions:

    1. when no resumeAt timestamp provided the change stream will start from current time, and will not draw any previous events.
    2. resumeAfter event vs timestamp difference can be found here: stackOverflow answer but keep in mind, that for timestamp it is inclusive of the event, so if you want to start from next event(in java) do:
        private BsonTimestamp getNextEventTimestamp(BsonTimestamp timestamp) {
            return new BsonTimestamp(timestamp.getValue() + 1);
        }
    
    1. In case of internet disconnection the change stream will not resume, as such I recommend to take following approach in case of error:
        private void onException() {
            ScheduledExecutorService executorService = newSingleThreadScheduledExecutor();
            executorService.scheduleAtFixedRate(() -> recreateChangeStream(executorService), 0, 1, TimeUnit.SECONDS);
        }
    
        private void recreateChangeStream(ScheduledExecutorService executorService) {
            try {
                mongoTemplate.getDb().runCommand(new BasicDBObject("ping", "1"));
                container.stop();
                startNewContainer();
                executorService.shutdown();
            } catch (Exception ignored) {
            }
        }
    

    First I am creating a runnable scheduled task that always runs(but only 1 at a time newSingleThreadScheduledExecutor()), I am trying to ping the DB, after a successful ping I am stopping the old container and starting a new one, you can also pass the last timestamp you took so that you can get all events you might have missed

    timestamp retrieval from event:

    BsonTimestamp resumeAtTimestamp = changeStreamDocument.getClusterTime();
    

    then I am shutting down the task.

    also make sure the resumeAtTimestamp exist in oplog...