Spring Cloud : Supplier continuously Kafka publishing events how to published only one ?
public static HashMap<String, Ticker> transactionsOfAccount = new HashMap<>(0);
public LinkedList<Ticker> lists = new LinkedList<>();
Producer.class
@Bean
public Supplier<Message<Ticker>> messageSupplier() {
return () -> {
if (tickerPublisher.lists.peek() != null) {
Message<Ticker> msg = MessageBuilder
.withPayload(tickerPublisher.lists.peek())
.build();
log.info("Total Size is {}",tickerPublisher.lists.size());
log.info("Message: {}", msg.getPayload());
tickerPublisher.lists.get(0).setStatus(Status.SUCESS);
return msg;
} else {
return null;
}
};
}
RestController.class
@GetMapping(value = "/quote-mono", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<Ticker> getQuoteMono(@RequestParam("symbol") String symbol) {
tickerPublisher.publisherMono(symbol);
return mono;
}
PublisherService.class
public void publisherMono(String ticker) {
String path = ticker.toUpperCase() + "/prices/realtime?api_key=" + apiKey;
this.webClient
.get()
.uri(path)
.retrieve()
.bodyToMono(Ticker.class)
.flatMap(data -> sendToKafka(ticker, data))
.doOnNext(data-> {
log.info("next events from published : {}", data);
if (transactionsOfAccount.containsKey(ticker) && !lists.isEmpty()) {
log.info("list is clear now ");
transactionsOfAccount.clear();
}
})
.subscribe(
data -> {
log.info("data is {}", data);
this.sinkMono.tryEmitValue((Ticker) data);
},
(err) -> log.info(String.valueOf(err)),
() -> {
log.info("Completed");
}
)
;
log.info(lists.toString());
}
Issues is Supplier Class is publishing continuously duplicate events in KafkaBroker
Need help where is the issues how Can we published Mono Single events if response is Single object
Rest response is
{"last_price":245.3,"last_time":"2022-12-09T22:44:53.000Z","last_size":null,"bid_price":244.98,"bid_size":100,"ask_price":250.0,"ask_size":96,"open_price":246.4,"close_price":null,"high_price":248.2,"low_price":244.37,"exchange_volume":1168680,"market_volume":null,"updated_on":"2022-12-09T22:59:58.183Z","source":"bats_delayed","security":{"id":"sec_XaL6mg","ticker":"MSFT","exchange_ticker":"MSFT:UW","figi":"BBG000BPHFS9","composite_figi":"BBG000BPH459"}}
Expectation is to published only one events with WebClient
The supplier programming model in Spring Cloud Stream is intended for polling, i.e., it polls every configured time duration and triggers the supplier. By default, it polls every second. You can change that by configuring it. See this for more details.
However, in your case, this may not work. You might not want a supplier and should publish programmatically using something like a StreamBridge.