I need a pub/sub event message system with Azure Cosmos DB. I use Azure Cosmos DB Java SDK v4.
I try with a ChangeFeedProcessor based on this sample https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples/blob/main/src/main/java/com/azure/cosmos/examples/changefeed/SampleChangeFeedProcessor.java but it does not work like expected.
My problems:
With version 4.12.0 of the Java SDK the follow code snipped works for me. But it use beta code from the driver. It can change in the future.
private static final String CHANNEL = "events";
private CosmosContainer collection;
private boolean stopped;
void start( String clientID ) {
CosmosContainerProperties props = new CosmosContainerProperties( CHANNEL, "/type" );
// delete all events after 60 seconds. All nodes should receive it in the meantime.
props.setDefaultTimeToLiveInSeconds( 60 );
collection = getOrCreateContainer( props );
Thread thread = new Thread( () -> {
String[] continuation = new String[1];
try {
while( !stopped ) {
// sample code: https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
CosmosChangeFeedRequestOptions options = continuation[0] == null ? //
CosmosChangeFeedRequestOptions.createForProcessingFromNow( FeedRange.forFullRange() ) : // initial value
CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( continuation[0] ); // continue value
Iterator<EventPOJO> it = collection //
.queryChangeFeed( options, EventPOJO.class ) //
.handle( ( response ) -> continuation[0] = response.getContinuationToken() ) //
.iterator();
while( it.hasNext() ) {
EventPOJO event = it.next();
if( event.client != clientID ) {
// filter the own events
onMessage( event );
}
}
// poll interval
Thread.sleep( 1000 );
}
} catch( Throwable th ) {
if( !stopped ) {
PersistenceLogger.LOGGER.error( th );
}
}
}, CHANNEL );
thread.setDaemon( true );
thread.start();
}
<T> void send( T event, String clientID ) {
EventPOJO evt = new EventPOJO();
evt.id = ...
evt.client = clientID;
evt.type = event.getClass().getName();
evt.message = ...
collection.createItem( evt );
}