.netazureazure-cosmosdbazure-cosmosdb-changefeed

Change Feed Processor Issue


I am trying to test change feed processor in .net. I have tried to use the change feed processor from the beginning (as mentioned in documentation). When I start the change feed processor, it is running as expected, if I make a change in COSMOS db, it is triggering HandleChanges method. I wanted to test one scenario: I have stopped my change feed processor locally, made 2 changes to cosmos db and started the processor, this time the processor was only picking the latest change. Why is this? Am I missing something in the code?

This is my code:

public class ChangeFeedListener 
{
    private static CosmosClient _cosmosClient;
    private static Database _productDatabase;
    private static Container _productContainer;
    private static Container _productLeaseContainer;
    private IAuditMessenger _auditMessenger = null;
    private TelemetryClient _telemetryClient = null;

    public ChangeFeedListener(IAuditMessenger auditMessenger, TelemetryClient telemetryClient)
    {
        _auditMessenger = auditMessenger;
        _telemetryClient = telemetryClient;
    }

    public async Task StartListener(CancellationToken cancellationToken)
    {
        _cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
        _productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
        _productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
        _productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);

        await StartChangeFeedProcessorAsync(_cosmosClient);
    }

    private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(CosmosClient cosmosClient)
    {
        Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
        {
            if (exception is ChangeFeedProcessorUserException userException)
            {
                //handle user exception
            }
            else
            {
                //handle other exceptions
            }

            return Task.CompletedTask;
        };

        Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);
        string processorName = "abc";
        string instanceName = "test";

        ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
            .GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
            .WithErrorNotification(onErrorAsync)
            .WithInstanceName(instanceName)
            .WithLeaseContainer(leaseContainer)
            .WithStartTime(DateTime.MinValue.ToUniversalTime())
            .Build();

        await changeFeedProcessor.StartAsync();
        return changeFeedProcessor;
    }

    private async Task HandleChangesAsync(IReadOnlyCollection<ABCItem> changes, CancellationToken cancellationToken)
    {
        //handler code
    }
}

Solution

  • As I was using service fabric explorer, even after I stopped local debugging, the listener continued to run in the background, hence it was processing them in the background. There was no data loss and it worked fine. I realized this when the continuation token with TS seemed to changed even after I stopped the listener