spring-bootreactive-programmingspring-webfluxspring-cloud-streamspring-mono

Spring Cloud : Supplier continuously publishing Kafka events instead of one?


Spring Cloud : Supplier continuously Kafka publishing events how to published only one ?

  public static HashMap<String, Ticker> transactionsOfAccount = new HashMap<>(0);
    public LinkedList<Ticker> lists = new LinkedList<>();

Producer.class

@Bean
public Supplier<Message<Ticker>> messageSupplier() {

    return () -> {
        if (tickerPublisher.lists.peek() != null) {
            Message<Ticker> msg = MessageBuilder
                    .withPayload(tickerPublisher.lists.peek())
                    .build();
            log.info("Total Size is {}",tickerPublisher.lists.size());
            log.info("Message: {}", msg.getPayload());
            tickerPublisher.lists.get(0).setStatus(Status.SUCESS);
            return  msg;
        } else {
            return null;
        }
    };
}

RestController.class

@GetMapping(value = "/quote-mono", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<Ticker> getQuoteMono(@RequestParam("symbol") String symbol) {
    tickerPublisher.publisherMono(symbol);
    return mono;
}

PublisherService.class

 public void publisherMono(String ticker) {
        String path = ticker.toUpperCase() + "/prices/realtime?api_key=" + apiKey;
        this.webClient
                .get()
                .uri(path)
                .retrieve()
                .bodyToMono(Ticker.class)
                .flatMap(data -> sendToKafka(ticker, data))
                .doOnNext(data-> {
                    log.info("next events from published : {}", data);
                    if (transactionsOfAccount.containsKey(ticker) && !lists.isEmpty()) {
                        log.info("list is clear now ");
                        transactionsOfAccount.clear();
                    }
                })
                .subscribe(
                        data -> {
                            log.info("data is {}", data);
                            this.sinkMono.tryEmitValue((Ticker) data);
                        },
                        (err) -> log.info(String.valueOf(err)),
                        () -> {
                            log.info("Completed");
                        }
                )
        ;
        log.info(lists.toString());
    }

Issues is Supplier Class is publishing continuously duplicate events in KafkaBroker

enter image description here

Need help where is the issues how Can we published Mono Single events if response is Single object

Rest response is

{"last_price":245.3,"last_time":"2022-12-09T22:44:53.000Z","last_size":null,"bid_price":244.98,"bid_size":100,"ask_price":250.0,"ask_size":96,"open_price":246.4,"close_price":null,"high_price":248.2,"low_price":244.37,"exchange_volume":1168680,"market_volume":null,"updated_on":"2022-12-09T22:59:58.183Z","source":"bats_delayed","security":{"id":"sec_XaL6mg","ticker":"MSFT","exchange_ticker":"MSFT:UW","figi":"BBG000BPHFS9","composite_figi":"BBG000BPH459"}}

Expectation is to published only one events with WebClient


Solution

  • The supplier programming model in Spring Cloud Stream is intended for polling, i.e., it polls every configured time duration and triggers the supplier. By default, it polls every second. You can change that by configuring it. See this for more details.

    However, in your case, this may not work. You might not want a supplier and should publish programmatically using something like a StreamBridge.