Is there a way to make amqp reactive-messaging and quarkus ensure order of events in message groups?
There is my implementation next, and I can see that inbox-events subscriber can observe multiple events within one message group, and that's not exactly what I had expected.
In my opinion, the next message in a group should come only after acknowledging the prev one.
Consumer
@Channel("inbox-events") Multi<Long> inboxEvents;
...
inboxEvents
.onItem().transformToUniAndMerge(this::handleEvent)
.subscribe()
.with(eventId -> {
log.info("Event was handled, id={}", eventId);
});
Producer
@Channel("outbox-events") MutinyEmitter<Long> outboxEventsEmitter;
...
final var metadata = OutgoingAmqpMetadata.builder()
.withGroupId(String.valueOf(groupId))
.build();
final var message = Message.of(id)
.addMetadata(metadata);
return outboxEventsEmitter.sendMessage(message);
application.properties
mp.messaging.incoming.inbox-events.connector=smallrye-amqp
mp.messaging.incoming.inbox-events.address=DISPATCHING
mp.messaging.outgoing.outbox-events.connector=smallrye-amqp
mp.messaging.outgoing.outbox-events.durable=true
mp.messaging.outgoing.outbox-events.address=DISPATCHING
The AMQP clients do not provide any parallelism by message group id out of the box.
You therefore need to manually dispatch event processing to a worker thread pool to process events in parallel. Mutiny APIs allow to do such operations. You can couple that with the KeyedMulti support to group messages by group id (available since Reactive Messaging 4.6.0).
Here is a sample code :
@Incoming("data")
@Outgoing("sink")
Multi<Integer> process(KeyedMulti<String, Integer> group) {
return group
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.map(i -> {
String id = group.key();
try {
Thread.sleep(20L * i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i + 1;
});
}
@ApplicationScoped
static class AmqpKeyValueExtractor implements KeyValueExtractor {
@Override
public boolean canExtract(Message<?> first, Type keyType, Type valueType) {
return first.getMetadata(IncomingAmqpMetadata.class).isPresent();
}
@Override
public Object extractKey(Message<?> message, Type keyType) {
return message.getMetadata(IncomingAmqpMetadata.class)
.map(IncomingAmqpMetadata::getGroupId)
.orElse(null);
}
@Override
public Object extractValue(Message<?> message, Type valueType) {
return message.getPayload();
}
}
The KeyValueExtractor
is self-explanatory, for each incoming message the code extracts a key and a value to be used in KeyedMulti
.
The process
method will be called for each new group id, and the method will use runSubscriptionOn
to run the processing on a worker thread pool (You cannot use @Blocking
annotation in this scenario).
You'll notice that while the message order is preserved inside a group, it'll happen concurrently across all groups.