I am trying to test change feed processor in .net. I have tried to use the change feed processor from the beginning (as mentioned in documentation). When I start the change feed processor, it is running as expected, if I make a change in COSMOS db, it is triggering HandleChanges method. I wanted to test one scenario: I have stopped my change feed processor locally, made 2 changes to cosmos db and started the processor, this time the processor was only picking the latest change. Why is this? Am I missing something in the code?
This is my code:
public class ChangeFeedListener
{
private static CosmosClient _cosmosClient;
private static Database _productDatabase;
private static Container _productContainer;
private static Container _productLeaseContainer;
private IAuditMessenger _auditMessenger = null;
private TelemetryClient _telemetryClient = null;
public ChangeFeedListener(IAuditMessenger auditMessenger, TelemetryClient telemetryClient)
{
_auditMessenger = auditMessenger;
_telemetryClient = telemetryClient;
}
public async Task StartListener(CancellationToken cancellationToken)
{
_cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
_productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
_productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
_productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);
await StartChangeFeedProcessorAsync(_cosmosClient);
}
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(CosmosClient cosmosClient)
{
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
//handle user exception
}
else
{
//handle other exceptions
}
return Task.CompletedTask;
};
Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);
string processorName = "abc";
string instanceName = "test";
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
.GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName(instanceName)
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
await changeFeedProcessor.StartAsync();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(IReadOnlyCollection<ABCItem> changes, CancellationToken cancellationToken)
{
//handler code
}
}
As I was using service fabric explorer, even after I stopped local debugging, the listener continued to run in the background, hence it was processing them in the background. There was no data loss and it worked fine. I realized this when the continuation token with TS seemed to changed even after I stopped the listener