quarkusamqpsmallrye-reactive-messaging

How to ensure order of events in message groups with amqp reactive-messaging and quarkus?


Is there a way to make amqp reactive-messaging and quarkus ensure order of events in message groups?

There is my implementation next, and I can see that inbox-events subscriber can observe multiple events within one message group, and that's not exactly what I had expected.

In my opinion, the next message in a group should come only after acknowledging the prev one.

Consumer

@Channel("inbox-events") Multi<Long> inboxEvents;
...
inboxEvents
    .onItem().transformToUniAndMerge(this::handleEvent)
    .subscribe()
    .with(eventId -> {
        log.info("Event was handled, id={}", eventId);
    });

Producer

@Channel("outbox-events") MutinyEmitter<Long> outboxEventsEmitter;
...
final var metadata = OutgoingAmqpMetadata.builder()
    .withGroupId(String.valueOf(groupId))
    .build();
final var message = Message.of(id)
    .addMetadata(metadata);
return outboxEventsEmitter.sendMessage(message);

application.properties

mp.messaging.incoming.inbox-events.connector=smallrye-amqp
mp.messaging.incoming.inbox-events.address=DISPATCHING
mp.messaging.outgoing.outbox-events.connector=smallrye-amqp
mp.messaging.outgoing.outbox-events.durable=true
mp.messaging.outgoing.outbox-events.address=DISPATCHING

Solution

  • The AMQP clients do not provide any parallelism by message group id out of the box.

    You therefore need to manually dispatch event processing to a worker thread pool to process events in parallel. Mutiny APIs allow to do such operations. You can couple that with the KeyedMulti support to group messages by group id (available since Reactive Messaging 4.6.0).

    Here is a sample code :

        @Incoming("data")
        @Outgoing("sink")
        Multi<Integer> process(KeyedMulti<String, Integer> group) {
            return group
                    .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
                    .map(i -> {
                        String id = group.key();
                        try {
                            Thread.sleep(20L * i);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                        return i + 1;
                    });
        }
    
        @ApplicationScoped
        static class AmqpKeyValueExtractor implements KeyValueExtractor {
    
            @Override
            public boolean canExtract(Message<?> first, Type keyType, Type valueType) {
                return first.getMetadata(IncomingAmqpMetadata.class).isPresent();
            }
    
            @Override
            public Object extractKey(Message<?> message, Type keyType) {
                return message.getMetadata(IncomingAmqpMetadata.class)
                        .map(IncomingAmqpMetadata::getGroupId)
                        .orElse(null);
            }
    
            @Override
            public Object extractValue(Message<?> message, Type valueType) {
                return message.getPayload();
            }
        }
    

    The KeyValueExtractor is self-explanatory, for each incoming message the code extracts a key and a value to be used in KeyedMulti.

    The process method will be called for each new group id, and the method will use runSubscriptionOn to run the processing on a worker thread pool (You cannot use @Blocking annotation in this scenario).

    You'll notice that while the message order is preserved inside a group, it'll happen concurrently across all groups.