springspring-webfluxspring-reactivespring-reactor

How to bridge between a non-reactive Spring EventListener and a reactive Flux


What's the difference between creating a Flux directly by calling Flux.push and use the sink within push's lambada expression vs. using a sink provided by a DirectProcessor?

In a minimal example where a Flux just emits a couple of events, I could do

Flux.<String>push(emitter -> {
   emitter.next("One");
   emitter.next("Two");
   emitter.complete();
 });

vs. using a DirectProcessor

var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();

Just to clarify: I know that I could use Flux.just here, but my use case is actually building a bridge between Spring's @EventListeners and Spring WebFlux, where I want to create a Flux for every incoming SSE request for a specific resource and then publish events to this Flux.

Could anybody tell me, if both approaches would be valid? Sure, there must be some difference. In particular, the Reactor Reference Guide section on DirectProcessor states:

On the other hand, it has the limitation of not handling backpressure. As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.

What does that mean?

[EDIT:] In an earlier version of the question I was using Flux.generate() instead of Flux.push(), which is obviously wrong, because generate can create one event at most.

[EDIT 2:] @123 asked me for a full example of what I'm trying to achieve. Bear with me, it's a fair amount of code for an SO question:

Full example of what I'm actually trying to do

I'd like to build a bridge between a (non-reactive) Spring domain event listener and a reactive Flux, which I can then use in a WebFlux endpoint to publish SSEs. The following code snippets use Lombok annotations for brevity.

Let's assume that I eventually want to publish the state of a user in an onboarding process as SSEs. Here's the enum:

public enum ProcessState {
  CREATED(false),
  VERIFIED(false),
  AUTHORIZATION_PENDING(false),
  AUTHORIZED(false),
  ACTIVE(true);

  @Getter
  private final boolean terminalState;

  ProcessState(boolean terminalState) {
    this.terminalState = terminalState;
  }

}

The non-reactive business logic will publish StateChangedEvents whenever the state of any user is changed:

@Data
@RequiredArgsConstructor
public class StateChangedEvent {
  private final UUID userId;
  private final ProcessState newState;
}

And this is where my original question comes from. How would I build a bridge that translates this domain events into a Flux stream? My requirements:

This is what I've got so far:

@Component
@RequiredArgsConstructor
class EventBridge {

  @RequiredArgsConstructor(access = PRIVATE)
  private static class Subscriber {
    private final UUID userId;
    private final FluxSink<ProcessState> sink;
    private boolean eventEmitted;
  }

  private final UserRepository repository;
  private final Map<UUID, Subscriber> subscribers = new ConcurrentHashMap<>();

  @EventListener
  void stateChanged(StateChangedEvent event) {
    notifySubscribers(event);
  }

  Flux<ProcessState> register(UUID userId) {
    return Flux.push(emitter -> addSubscriber(userId, emitter));
  }

  private Subscriber addSubscriber(UUID userId, FluxSink<ProcessState> sink) {
    var subscriptionId = randomUUID();
    var subscriber = new Subscriber(userId, sink);
    subscribers.put(subscriptionId, subscriber);
    sink
      .onRequest(n -> poll(subscriber))
      .onDispose(() -> removeSubscriber(subscriptionId));
    return subscriber;
  }

  private void poll(Subscriber subscriber) {
    emit(subscriber, loadCurrentState(subscriber), true);
  }

  private ProcessState loadCurrentState(Subscriber subscriber) {
    return repository.findById(subscriber.userId).getProcessState();
  }

  private void removeSubscriber(UUID subscriptionId) {
    subscribers.remove(subscriptionId);
  }

  private void notifySubscribers(StateChangedEvent event) {
    subscribers.values().stream()
      .filter(subscriber -> subscriber.userId.equals(event.getUserId()))
      .forEach(subscriber -> emit(subscriber, event.getNewState(), false));
  }

  private void emit(Subscriber subscriber, ProcessState processState, boolean onlyIfFirst) {
    synchronized (subscriber) {
      if (onlyIfFirst && subscriber.eventEmitted) {
        return;
      }
      subscriber.sink.next(processState);
      if (processState.isTerminalState()) {
        subscriber.sink.complete();
      }
      subscriber.eventEmitted = true;
    }
  }

}

And finally the controller, where the bridge is used:

@RestController
@RequiredArgsConstructor
class UserController {

  private final EventBridge eventBridge;

  @GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
  Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
    return eventBridge.register(userId).map(response -> ServerSentEvent.builder((ProcessState) response).build());
  }

}

There are a couple of issues in my bridge code I can't wrap my head around:


Solution

  • So if I understand what you are trying to do correctly I think your solution could be heavily simplified.

    @Component
    public class EventBridge {
    
        private final UserRepository repository;
        private final ReplayProcessor<StateChangedEvent> processor;
        private final FluxSink<StateChangedEvent> sink;
    
    
        EventBridge(UserRepository repository){
            this.repository= repository;
            //Replays events from last 100S for every new subscriber
            this.processor = ReplayProcessor.createTimeout(Duration.ofSeconds(100L));
            //Sink provides thread safe next,complete and error for subscribers
            this.sink = processor.sink();
        }
    
        public void changeState(StateChangedEvent event) {
            //Literally just pass event into sink, calls onNext on subscribers
            sink.next(event);
        }
    
        public Flux<ProcessState> streamProcessStateForUser(UUID uuid){
            return
                    //Search repository first, this isn't great since it blocks until 
                    //the repo returns, although that seems to be what you want
                    //Also I added an Unknown to ProcessState, since it's better than 
                    //it being null. 
                    //Also you should probably return optional from repo. 
                Flux.concat(
                        Flux.just(
                                userRepo.findById(uuid).map(User::getProcessState).orElse(ProcessState.UNKNOWN)
                        ),
                        processor
                                //Check the uuid matches the event
                                .filter(stateChangedEvent -> stateChangedEvent.getUserId().equals(uuid))
                                //Time out after 100 seconds, not needed but may be useful for you
                                .take(Duration.ofSeconds(100L))
                                //Complete flux when at terminal state
                                .takeUntil(stateChangedEvent -> stateChangedEvent.getNewState().isTerminalState())
                                //Convert to ProcessState from StateChangedEvent
                                .map(StateChangedEvent::getNewState)
                );
        }
    
    }
    

    Should be able to keep everything else the same.