azure-cosmosdbazure-cosmosdb-changefeed

How to parallel process different partition ranges with Cosmos change feed (push)?


Looking at below document it explains that within a deployment unit, different instances can process different partition range values.

"change feed processor is assigning different ranges to each instance"

Source: https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/change-feed-processor?tabs=dotnet#components-of-the-change-feed-processor

However, there is no such API where you can specify the partition range when creating an instance.

ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

Is this supported in Push model ? I do see in pull model there is a way.

I tried using emulator and creating items with different partition key values. Had 2 consumers (instances of same processor) running.

Expected: Different consumers get notified for different partition key values.

Actual: Only one consumer keeps receiving for all. This is not going to scale.


Solution

  • The reference document mentions:

    We see that the partition key values are distributed in ranges (each range representing a physical partition) that contain items.

    Each range is being read in parallel and its progress is maintained separately from other ranges in the lease container through a lease document.

    So the number of leases depends on your number of physical partitions.

    Then on the section for Dynamic scaling:

    If these three conditions apply, then the change feed processor will distribute all the leases in the lease container across all running instances of that deployment unit and parallelize compute using an equal distribution algorithm. One lease can only be owned by one instance at a given time, so the number of instances should not be greater than the number of leases.

    So, depending on the size of the container, the number of leases are defined, and it defines the number of machines you can parallelize the work on. A single machine can handle multiple leases, each lease starts an independent parallel process. The reason you might want to scale to multiple machines is when CPU becomes a bottleneck, but the maximum amount of machines is dependent on the leases, which depend on the size of the container.

    Also:

    Moreover, the change feed processor can dynamically adjust to containers scale due to throughput or storage increases. When your container grows, the change feed processor transparently handles these scenarios by dynamically increasing the leases and distributing the new leases among existing instances.

    When the container grows, new leases will appear, and increases the potential parallelism.

    The reason your tests might be yielding a single instance with activity might be because your monitored container has 1 physical partition. If you are using the Emulator, you can create a 15K RU container and that would have multiple physical partitions.