javaspringspring-bootspring-webfluxproject-reactor

WebFlux Sinks for multiservice streaming


I'm trying to make several reactive microservices: One producer:

@RestController
@RequiredArgsConstructor
public class EventController {

    private final Sinks.Many<Event> sink;

    @PostMapping()
    public void publish(@RequestParam Event event) {
        Sinks.EmitResult emitResult = sink.tryEmitNext(event);
        if (emitResult.isFailure()) {
            throw new RuntimeException("Error while publishing event");
        } else System.out.println("Received: " + event);
    }

    @GetMapping()
    public Mono<Event> getEvent() {
        return sink.asFlux().next();
    }
}

I configure Sink as

  @Bean
    public Sinks.Many<Event> sink() {
        return Sinks.many().multicast().onBackpressureBuffer();
    }

I use the Event enumeration as the passed value. In the service producer I am trying to make two endpoints:

  1. An event generator that accepts an event as a @RequestParam and adds it to the stream.
  2. Endpoint that consumers use to get values.

and multiple consumers:

@EnableScheduling
@RequiredArgsConstructor
@SpringBootApplication
public class ReactiveEventsSubscriberApplication {

    private final WebClient webClient;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveEventsSubscriberApplication.class, args);
    }

    @Scheduled(fixedDelay = 3, timeUnit = TimeUnit.SECONDS)
    public void subscribe() {
        Mono<Event> eventMono = webClient.get()
                .retrieve()
                .bodyToMono(Event.class);
        eventMono.subscribe(System.out::println);
    }
}

I want service consumers to receive the latest event. When I try to create an event, nothing happens, as if there are no objects in the sink at all. What am I doing wrong?


Solution

  • You are trying to implement a publsih-subscribe pattern in your microservice, however, In your consumer service, you're using webClient.get() to retrieve events. this will perform an HTTP GET request, which may not be suitable for receiving events from the producer service. I think, the consumer should subscribe to the sink to receive events pushed by the producer.

    Your consumer should look something like;

    @Service
    @RequiredArgsConstructor
    public class EventSubscriberService {
    
    private final Sinks.Many<Event> sink;
    
    @PostConstruct
    public void subscribeToEvents() {
        sink.asFlux().subscribe(event -> {
            System.out.println("Received event: " + event);
            // Process the event as needed
        });
    }
    )
    

    Also, In your producer controller, I would say you should @RequestBody instead of using @RequestParam, and then pass the Event attributes as part of the request body while calling the API