I have this sink configured in my app:
createdSink = Sinks.many().multicast().onBackpressureBuffer(4096);
public void activityCreated(ActivityResource createdActivity) {
try {
var notification = new ActivityCreatedNotification(createdActivity);
createdSink.emitNext(notification, getFailureHandler());
} catch (Exception e) {
log.error("Emit activity created notification for activity id {} failed", createdActivity.getId(), e);
}
}
private static Sinks.EmitFailureHandler getFailureHandler() {
return Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(5));
}
Now unfortunately I'm not sure if I understand what my failure handler does, or how I can configure it to do what I want.
But I was hoping to get some help from here.
I'm sometimes observing an overflow excpetion like this:
2025-05-05T05:46:01.312Z ERROR [] [reactor-http-epoll-25]
[00000000-0000-4000-a000-000000000000][][][TRACE_ID:b42fa551661d9427][][c.e.d.p.a.e.ActivityExternalNotificationService.lambda$prepareExternalNotifications$3(79)]
- Error while processing activity notification
reactor.core.Exceptions$OverflowException: Backpressure overflow during Sinks.Many#emitNext
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:249)
This is even though I have the backpressure configured, so it probably means that the backpressure queue was already full and elements kept on being emitted to the sink, while one of the subscribers could not handle the rate, right?
So what I would like in that case is to drop elements, and to avoid the sink from collapsing/terminating.
Basically I must have this hot publisher, the sink, to be infinite, it can't stop working.
So for dropping elements all I saw was that there is this directBestEffort()
method in reactive:
Sinks.many().multicast().directBestEffort()
Which, if I understood correctly, would just drop elements if my subscribers can't handle the rate in which elements are being emitted to the sink. So that could be a solution, but as it seems, this does not let me also have a backpressure queue like today.
So I guess the question is: How can I configure a backpressure queue like the one I have today which is nice to have (buffer some elements instead of dropping right away) but to also avoid the overflow error and to drop instead?
Is there a way to configure this using the errorHandler? Or by different means? Thanks!
how can I configure a backpressure queue like the one I have today which is nice to have (buffer some elements instead of dropping right away) but to also avoid the overflow error and to drop instead?
As far as I know the Sinks.many().multicast().onBackpressureBuffer(4096)
does not drop on overflow — it throws.
And if you like to stay with multicast you have to explicitly handle the exception. I would rather use tryEmitNext()
than emitNext()
and check whether there is a failure or not. With all these in mind you should have next one method body for activityCreated(),
I guess
public void activityCreated(ActivityResource createdActivity) {
var notification = new ActivityCreatedNotification(createdActivity);
var result = createdSink.tryEmitNext(notification);
if (result.isFailure()) {
System.out.println("Dropping activity created event due to backpressure");
// or here you can have a fallback handler
}
}