I am struggling to implement something and am hoping to get some guidance here.
We are switching to ASB and need to fit our incoming message processing into an existing framework.
Our requirements are that a given client instance only process up to X messages at a time (on a given queue). The message will be processed on multiple threads. The thread that receives the message may not be the one that consumes it. Also, we need to enable the auto-renew of the lock.
It seems our only option is the ServiceBusReceiverAsyncClient. (I tried using ServiceBusProcessorClient, but auto-lock renewal was halted as soon as I added the message to our local collection and returned.)
Here is what I started with:
private void recreateReceiver() {
receiver = new ServiceBusClientBuilder()
.credential(NAMESPACE, tokenCredential);
.receiver()
.queueName(queueName)
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.disableAutoComplete()
.prefetchCount(0)
.maxAutoLockRenewDuration(Duration.ofSeconds((long)Integer.MAX_VALUE))
.buildAsyncClient();
receiver.receiveMessages()
.subscribe(
message -> makeMessageAvailable(message),
error -> processReceiveMessageError(error)
);
}
public viod makeMessageAvailable(ServiceBusReceivedMessage asbMessage) {
Custom back pressure.
In this method, offer the message to a
java.util.concurrent.BlockingQueue that has max capacity of X.
Method won't return until it is able to add to the blocking queue.
}
public Message nextDelivery(long timeoutMS) {
This method is called by a different thread.
It will get a message if it has been made available by makeMessageAvailable.
Otherwise it will block for timeoutMS.
The calling thread may pass the message to another thread.
One of those threads will eventually call consume(...)
}
public void consume(Message message) {
receiver.complete(message).block();
Also, remove from the blocking queue so that
makeMessageAvailable(...) can make it available in nextDelivery(...).
}
In my initial testing, the above works for happy path scenario. (I haven't tested error handling.) But then I read in the SDK documentation that I should be using "usingWhen" on the Flux. And it seems I'm not making proper use of Flux. I consulted an AI which suggested using Flux to implement the back pressure, but that seemed quite tricky to insert into this framework. Using a BlockingQueue seems simpler.
I tried to make "usingWhen" work as below. But running multiple clients, one client seemed to block the other from receiving messages. And even with just one client, sometimes it would just idle and not receive any messages.
receiverSubscription = Flux.usingWhen(
Mono.just(receiver)
, receiver -> receiver.receiveMessages()
.flatMap(message -> makeMessageAvailable(message)
.onErrorResume(e -> processMessageError(message, e))
)
, receiver -> Mono.fromRunnable(receiver::close))
.subscribe(
unused -> {},
error -> processReceiveError(error)
);
AI then also suggested the following. I don't at this point understand what this is even doing. I don't blindly use AI suggestions, but I gave it a quick try. With this, the client gets just get one message and it stops receiving, even though makeMessageAvailable returns immediately for that first message. I didn't dig into what this doing further, since it does not even work.
receiver.receiveMessages()
.subscribeOn(Schedulers.boundedElastic()) // Critical for blocking
.parallel(1) // Single processing thread
.runOn(Schedulers.boundedElastic())
.doOnNext(message -> {
makeMessageAvailable(message);
})
.sequential()
.subscribe(
null,
error -> {
processReceiveError(error);
}
);
Flux is very complicated. I'm brand new to using Flux. I'll keep digging, but at this point, I will need to take quite a long time to understand Flux enough to decide on best course of action. I was hoping someone who understands Flux and ASB could suggest how to implement this. I think I'll have to look at using Flux for the backpressure if someone does not suggest another way.
UPDATE: I did implement throttling with Flux. I added an answer to explain. But I'm not marking it as accepted answer, because I might be missing something.
To my delight and surprise, there is actually a reference guide: https://projectreactor.io/docs/core/release/reference/reactiveProgramming.html.
I was able to implement the throttling using Flux, but it's not using "usingWhen". Preliminary smoke testing shows it working for happy path (and it did recover automatically from connection error), but I don't know if I'm missing important handling of the Flux, so am not marking this as Accepted Answer (yet).
It seems as if I am on the right track, though.
Update: This is working and with further reading seems to the most straight-forward approach. I've marked this answer as accepted.
private void createReceiver() {
receiverSubscriber = new ReceiverSubscriber();
receiver.receiveMessages().subscribe(receiverSubscriber);
}
private void consume(Message) {
receiver.complete(message).block();
synchronized (deliveryTagToUnackedDelivery) {
deliveryTagToUnackedDelivery.remove(messageID);
receiverSubscriber.checkAndRequestMore();
}
}
private class ReceiverSubscriber extends BaseSubscriber<ServiceBusReceivedMessage> {
private int pendingRequestCount;
@Override
protected void hookOnSubscribe(Subscription subscription) {
checkAndRequestMore();
}
@Override
protected void hookOnNext(ServiceBusReceivedMessage message) {
synchronized (deliveryTagToUnackedDelivery) {
// makeMessageAvailable will add it to deliveryTagToUnackedDelivery and
// a different thread will get the message from another method.
// Another thread will eventually call consume method
makeMessageAvailable(message);
--pendingRequestCount;
checkAndRequestMore();
}
}
private void checkAndRequestMore() {
// This method could be called from any number of threads
// (from the consume(...) method), so have to synchronize
// to keep the counts accurate.
synchronized (deliveryTagToUnackedDelivery) {
// Requests are cumulative, so have to adjust for that using pendingRequestCount
final int remainingCapacity = maxAllowedInProcess - deliveryTagToUnackedDelivery.size() - pendingRequestCount;
if (remainingCapacity > 0) {
pendingRequestCount += remainingCapacity;
request(remainingCapacity);
}
}
}
@Override
protected void hookOnError(Throwable throwable) {
processReceiveError(throwable);
}
}