I have a service that spawns off a number of Changefeeds to monitor a number of different Cosmos DB collections. In v1 or 2,the ChangefeedObserver class included the ChangefeedObserverContext from which I could extract the collection name from.
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> deltas, CancellationToken cancellationToken)
{
string observerCollection = string.Empty;
try
{
Regex rx = new Regex(@"\b(\w+$)");
observerCollection = rx.Match(context.FeedResponse.ContentLocation).Value.ToString();
In v3, instead of passing a type as you processor, you pass it a delegate method whose signature no longer contains the context
MS Docs Container.ChangesHandlerDelegate
private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
{
ChangeFeedProcessor changeFeedProcessor = monitoringContainer
.GetChangeFeedProcessorBuilder<Document>(hostName, this.HandleChangesAsync)
.WithInstanceName("isn")
.WithLeaseContainer(leasingContainer)
.Build();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
{
ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);
try
{
AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
}
catch (Exception ex)
{
logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
}
}
In the above coded, I have a basic method that creates the Changefeed (gets started via a timer), and the the delegate method that sends the processing off to a larger class to take actions, depending on the monitored collection.
So, how can I get this changefeeds Monitored Collection value into the ChangesHander?
You already have the reference, you can inject it or reference it.
private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
{
ChangeFeedProcessor changeFeedProcessor = monitoringContainer
.GetChangeFeedProcessorBuilder<Document>(hostName,
(IReadOnlyCollection<Document> changes, CancellationToken cancellationToken) =>
this.HandleChangesAsync(monitoringContainer, changes, cancellationToken))
.WithInstanceName("isn")
.WithLeaseContainer(leasingContainer)
.Build();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(Container monitoringContainer, IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
{
ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);
try
{
AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
}
catch (Exception ex)
{
logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
}
}