javaazure-cosmosdbazure-cosmosdb-sqlapiazure-java-sdkazure-cosmosdb-changefeed

Sample for publish/subscribe with Azure Cosmos DB in Java


I need a pub/sub event message system with Azure Cosmos DB. I use Azure Cosmos DB Java SDK v4.

I try with a ChangeFeedProcessor based on this sample https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeed/SampleChangeFeedProcessor.java but it does not work like expected.

My problems:


Solution

  • With version 4.12.0 of the Java SDK the follow code snipped works for me. But it use beta code from the driver. It can change in the future.

    private static final String                CHANNEL = "events";
    
    private CosmosContainer                    collection;
    
    private boolean                            stopped;
    
    void start( String clientID ) {
        CosmosContainerProperties props = new CosmosContainerProperties( CHANNEL, "/type" );
        // delete all events after 60 seconds. All nodes should receive it in the meantime.
        props.setDefaultTimeToLiveInSeconds( 60 );
        collection = getOrCreateContainer( props );
        Thread thread = new Thread( () -> {
            String[] continuation = new String[1];
            try {
                while( !stopped ) {
                    // sample code: https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
                    CosmosChangeFeedRequestOptions options = continuation[0] == null ? //
                    CosmosChangeFeedRequestOptions.createForProcessingFromNow( FeedRange.forFullRange() ) : // initial value
                    CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( continuation[0] ); // continue value
                    Iterator<EventPOJO> it = collection //
                                    .queryChangeFeed( options, EventPOJO.class ) //
                                    .handle( ( response ) -> continuation[0] = response.getContinuationToken() ) //
                                    .iterator();
                    while( it.hasNext() ) {
                        EventPOJO event = it.next();
                        if( event.client != clientID ) {
                            // filter the own events
                            onMessage( event );
                        }
                    }
                    // poll interval
                    Thread.sleep( 1000 );
                }
            } catch( Throwable th ) {
                if( !stopped ) {
                    PersistenceLogger.LOGGER.error( th );
                }
            }
        }, CHANNEL );
        thread.setDaemon( true );
        thread.start();
    }
    
    <T> void send( T event, String clientID ) {
        EventPOJO evt = new EventPOJO();
        evt.id = ...
        evt.client = clientID;
        evt.type = event.getClass().getName();
        evt.message = ...
    
        collection.createItem( evt );
    }