i'm having a flux of items returned from another service
Flux<Tweet> tweetsByUserId = restUtils.getTweetsByUserId(userId);
I want this flux to paralamlely be saved in database and send to the rest endpoint to be deleted
tweetRepository.saveAll(tweetsByUserId).collectList().map(lis -> lis.size()).doOnNext(System.out::println);
return deleteTweets(tweetsByUserId);
delete tweets method is executing successfully , but while trying to save the Flux of Tweet object to the mongodb it is not happening , from what I learned pipeline will start only after we subscribe doOnNext is enough to start the pipeline I think , can someone explain why this isn't persisting what I miss here
In the reactive world, nothing happens until you subscribe.
So you need to add .subscribe()
to your reactive chain of commands.
tweetRepository.saveAll(tweetsByUserId)
.collectList()
.map(lis -> lis.size())
.doOnNext(System.out::println)
.doOnError(System.err:println)
.subscribe(); // <-- this is the important part!!!
return deleteTweets(tweetsByUserId);
However, in the above snippet, the saving of tweets will be executed in a separate threat. And the deletions might be performed while the saving is still in progress.
So you might run into a race condition here. This code cannot guarantee that all tweets that are saved in the first place are deleted afterwards.
To guarantee this, both saving and deletion must be part of the same reactive stream.
I would suggest further improvements to make the code a little bit more idiomatic in terms of reactive programming.
The following snippet does almost the same as the first one. But it does not break the reactive stream, i.e. the order of events is taken care of.
void deleteTweetsById(List<String> tweetsByUserId) {
tweetRepository.saveAll(tweetsByUserId)
.collectList()
.flatMap(this::deleteTweets)
.map(List::size)
.doOnNext(System.out::println)
.subscribe();
}
Mono<List<String>> deleteTweets(List<String> tweetsByUserId) {
return Flux.fromIterable(tweetsByUserId)
.flatMap(singleTweetId -> tweetRepository
.deleteById(singleTweetId.getId())
.then(Mono.just(singleTweetId.getId())))
.collectList();
}
}