What I would like to achieve
Flux<MyPojo>
, send a http request with the json payload to a third party web service, fire and forget style, while being able to return the Flux for follow up processing.Information
the third party web server, which I have no control over, takes 5 seconds constant time to process one request.
The third party server does not offer any bulk or list API, it is one by one
The third party server is known to be very reliable, I am allowed to hit it as hard as possible
I do not care that much about the response, especially, there is no value in waiting 5 seconds for a response I will do nothing with.
What I have tried
private Flux<MyPojo> sendMyPojoFireAndForget(Flux<MyPojo> myPojoFlux) {
return myPojoFlux.flatMap(oneMyPojo -> webClient.post().uri("http://example.com/path").bodyValue(oneMyPojo).exchangeToMono(clientResponse -> Mono.just(oneMyPojo)));
}
However, this is not fire and forget. With proper logs on my side and seeing some logs from the third party, it seems I am waiting for the response.
I also tried
webClient.post()
.uri("http://example.com/path")
.bodyValue(oneMyPojo)
.retrieve()
.bodyToMono(Void.class)
.subscribe();
But this will make me "lose" the Flux of MyPojo and end up with a Flux of Void instead
Question:
You can use multiple hooks for an observable (such as a Mono or a Flux): in this case, you can use the doOnNext() before subscribing:
@Test
void test() {
Flux<MyPojo> myPojoFlux = Flux.just(
new MyPojo(1),
new MyPojo(2),
new MyPojo(3),
new MyPojo(4),
new MyPojo(5)
);
myPojoFlux
.doOnNext(this::sendPostRequest)
.subscribe(this::doSomethingElse);
}
private void doSomethingElse(MyPojo myPojo) {
System.out.println(myPojo);
}
private void sendPostRequest(MyPojo oneMyPojo) {
webClient.get()
.uri(...)
.bodyValue(oneMyPojo)
.retrieve()
.bodyToMono(String.class)
.subscribe(resp -> System.out.println(oneMyPojo + " " + resp));
}
Since the client works asynchronously if you don't block
, the doSomethingElse
method will be called regardless of the response of sendPostRequest