What's the difference between creating a Flux directly by calling Flux.push
and use the sink within push
's lambada expression vs. using a sink provided by a DirectProcessor
?
In a minimal example where a Flux just emits a couple of events, I could do
Flux.<String>push(emitter -> {
emitter.next("One");
emitter.next("Two");
emitter.complete();
});
vs. using a DirectProcessor
var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();
Just to clarify: I know that I could use Flux.just
here, but my use case is actually building a bridge between Spring's @EventListener
s and Spring WebFlux, where I want to create a Flux for every incoming SSE request for a specific resource and then publish events to this Flux.
Could anybody tell me, if both approaches would be valid? Sure, there must be some difference. In particular, the Reactor Reference Guide section on DirectProcessor
states:
On the other hand, it has the limitation of not handling backpressure. As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.
What does that mean?
[EDIT:] In an earlier version of the question I was using Flux.generate()
instead of Flux.push()
, which is obviously wrong, because generate can create one event at most.
[EDIT 2:] @123 asked me for a full example of what I'm trying to achieve. Bear with me, it's a fair amount of code for an SO question:
I'd like to build a bridge between a (non-reactive) Spring domain event listener and a reactive Flux, which I can then use in a WebFlux endpoint to publish SSEs. The following code snippets use Lombok annotations for brevity.
Let's assume that I eventually want to publish the state of a user in an onboarding process as SSEs. Here's the enum:
public enum ProcessState {
CREATED(false),
VERIFIED(false),
AUTHORIZATION_PENDING(false),
AUTHORIZED(false),
ACTIVE(true);
@Getter
private final boolean terminalState;
ProcessState(boolean terminalState) {
this.terminalState = terminalState;
}
}
The non-reactive business logic will publish StateChangedEvents
whenever the state of any user is changed:
@Data
@RequiredArgsConstructor
public class StateChangedEvent {
private final UUID userId;
private final ProcessState newState;
}
And this is where my original question comes from. How would I build a bridge that translates this domain events into a Flux stream? My requirements:
This is what I've got so far:
@Component
@RequiredArgsConstructor
class EventBridge {
@RequiredArgsConstructor(access = PRIVATE)
private static class Subscriber {
private final UUID userId;
private final FluxSink<ProcessState> sink;
private boolean eventEmitted;
}
private final UserRepository repository;
private final Map<UUID, Subscriber> subscribers = new ConcurrentHashMap<>();
@EventListener
void stateChanged(StateChangedEvent event) {
notifySubscribers(event);
}
Flux<ProcessState> register(UUID userId) {
return Flux.push(emitter -> addSubscriber(userId, emitter));
}
private Subscriber addSubscriber(UUID userId, FluxSink<ProcessState> sink) {
var subscriptionId = randomUUID();
var subscriber = new Subscriber(userId, sink);
subscribers.put(subscriptionId, subscriber);
sink
.onRequest(n -> poll(subscriber))
.onDispose(() -> removeSubscriber(subscriptionId));
return subscriber;
}
private void poll(Subscriber subscriber) {
emit(subscriber, loadCurrentState(subscriber), true);
}
private ProcessState loadCurrentState(Subscriber subscriber) {
return repository.findById(subscriber.userId).getProcessState();
}
private void removeSubscriber(UUID subscriptionId) {
subscribers.remove(subscriptionId);
}
private void notifySubscribers(StateChangedEvent event) {
subscribers.values().stream()
.filter(subscriber -> subscriber.userId.equals(event.getUserId()))
.forEach(subscriber -> emit(subscriber, event.getNewState(), false));
}
private void emit(Subscriber subscriber, ProcessState processState, boolean onlyIfFirst) {
synchronized (subscriber) {
if (onlyIfFirst && subscriber.eventEmitted) {
return;
}
subscriber.sink.next(processState);
if (processState.isTerminalState()) {
subscriber.sink.complete();
}
subscriber.eventEmitted = true;
}
}
}
And finally the controller, where the bridge is used:
@RestController
@RequiredArgsConstructor
class UserController {
private final EventBridge eventBridge;
@GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
return eventBridge.register(userId).map(response -> ServerSentEvent.builder((ProcessState) response).build());
}
}
There are a couple of issues in my bridge code I can't wrap my head around:
Do I really have to synchronize on my Subscriber
instance to avoid writing stale events from poll
ing the initial state? If I don't it does happen that a StateChange event arrives and gets published before the current state is read from the repository, which is then pushed out of order. Surely, there must be a more elegant Flux-ish way to handle this without the synchronized keyword.
We already ruled out Flux.generate
, it seems to work with Flux.push
, Flux.create
will generate a whole lot more SSE events? Why? I fear, I don’t understand the differences between the three.
Rather then using the static methods on Flux
should I use a DirectProcessor
or any other processor here? I'm new to the whole reactive stack and the Spring Reactor documentation is rather too vague for me, tbh. Again: What are the differences? What about that comment about back pressure I mentioned above?
So if I understand what you are trying to do correctly I think your solution could be heavily simplified.
@Component
public class EventBridge {
private final UserRepository repository;
private final ReplayProcessor<StateChangedEvent> processor;
private final FluxSink<StateChangedEvent> sink;
EventBridge(UserRepository repository){
this.repository= repository;
//Replays events from last 100S for every new subscriber
this.processor = ReplayProcessor.createTimeout(Duration.ofSeconds(100L));
//Sink provides thread safe next,complete and error for subscribers
this.sink = processor.sink();
}
public void changeState(StateChangedEvent event) {
//Literally just pass event into sink, calls onNext on subscribers
sink.next(event);
}
public Flux<ProcessState> streamProcessStateForUser(UUID uuid){
return
//Search repository first, this isn't great since it blocks until
//the repo returns, although that seems to be what you want
//Also I added an Unknown to ProcessState, since it's better than
//it being null.
//Also you should probably return optional from repo.
Flux.concat(
Flux.just(
userRepo.findById(uuid).map(User::getProcessState).orElse(ProcessState.UNKNOWN)
),
processor
//Check the uuid matches the event
.filter(stateChangedEvent -> stateChangedEvent.getUserId().equals(uuid))
//Time out after 100 seconds, not needed but may be useful for you
.take(Duration.ofSeconds(100L))
//Complete flux when at terminal state
.takeUntil(stateChangedEvent -> stateChangedEvent.getNewState().isTerminalState())
//Convert to ProcessState from StateChangedEvent
.map(StateChangedEvent::getNewState)
);
}
}
Should be able to keep everything else the same.