TLDR: DynamoDB async query on GSI is returning duplicates when forcibly timing out the query request.
I'm running performance tests on a solution built with CompletionStage and the non-blocking DynamoDbEnhancedAsyncClient and DynamoDbAsyncTable. It's using the sharding recommendation from AWS to split data into 30 partitions with a GSI for indexation.
Because throughput is the second most important feature of the product, with a throughput of thousands of events per second, I'm relying on queries with consumer to process the pages read from DynamoDB as fast as possible.
Here's the query in method getItemsFromGsiLessThan. It queries a GSI with PK and SK.
QueryConditional query = QueryConditional.sortLessThan(key);
QueryEnhancedRequest request = QueryEnhancedRequest.builder()
.queryConditional(query)
.limit(25)
.build();
try {
return table.index(indexName)
.query(request)
.limit(5)
.subscribe(page -> consumer.accept(page.items()));
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
This is the code processing it:
Phaser partitionPhaser = new Phaser(1);
partitionPhasers.put(partition, partitionPhaser);
dynamoDbRepository.getItemsFromGsiLessThan(key, gsiName, this)
.orTimeout(PROCESSING_TIMEOUT, MILLISECONDS)
.whenComplete((r, throwable) -> {
partitionPhaser.arriveAndAwaitAdvance();
partitionHandler.notifyStopProcessing(partition);
partitionPhaser.arriveAndDeregister();
reportPartitionProcessingTime(partition, startProcessingTime);
verifyAndHandleException(throwable);
});
Then there's the consumer this that I sent to method getItemsFromGsiLessThan.
The consuming method does
Phaser partitionPhaser = partitionPhasers.get(partition);
partitionPhaser.register();
expiredItemsConsumer.accept(itemsFromPartitionedGsi)
.thenCompose(processedItems -> dynamoDbRepository.deleteItems(processedItems)
.orTimeout(PROCESSING_TIMEOUT, MILLISECONDS)
.exceptionally(throwable -> {
// log something
return null;
}))
.exceptionally(throwable -> {
// log something
return null;
})
.whenComplete((r, throwable) -> {
partitionPhaser.arriveAndDeregister();
});
I want the second timeout to be only on the deletion, not on the expiration consumer processor. That one also performs non-blocking I/O calls to DynamoDB, sometimes taking many seconds for a reason I haven't been able to understand so far, but I don't want to interrupt it.
So, the sequence of operations is as follows:
1. register main phaser
2. get items expired after the previous run -> should run every second
3. call arriveAndAwaitAdvance until the consumers are done
4. for every page of items call the consumer `this`, which calls expiredItemsConsumer, and register the consumer on the Phaser
5. process the items and return the ones processed successfully
6. delete the returned items
7. call arriveAndDeregister by the consumer on the Phaser
8. once all the consumers ended their work, release the partition and call arriveAndDeregister
With the phaser in place, the normal pattern is to have the consumers ending before the partition is released. Depending on the number of items, sent to the consumers, one consumer might take longer than another.
1. Partition processing started
2. Consumer 1 started
3. Consumer 1 finished
4. Consumer 2 started
5. Consumer 3 started
6. Consumer 3 ended
7. Consumer 2 ended
8. Partition processing ended
Now, if you've read this far, here's the problem. Sometimes, 2 - 3 times during a 15 minutes performance test, the consumer receives the same item multiple times. The worst case was when I saw one item received 31 items. I think that happens when .orTimeout(PROCESSING_TIMEOUT, MILLISECONDS), called after dynamoDbRepository.getItemsFromGsiLessThan(key, gsiName, this) occurs. So, I see 3 - 4 occurrences of partition start - consumer timeout - partition end, then a partition start followed by many consumers. Despite having configured a maximum of 5 pages, I saw up to 31 pages being returned to the consumer, all of them having the same items. The only explanation I see possible is that the publisher keeps sending the pages to the consumer despite the timeout which should stop it.
Any ideas or hints are more than appreciated.
According to AWS support, cancelling the CompletableFuture with a timeout doesn't cancel the underline reactive stream. The solution is to rely on an AtomicBoolean to stop delivering the pages to the consumer when desired.