javaspring-reactivespring-mono

How to stop main thread to complete all Mono calls?


I'm making multiple mono calls to DB.And result of all Mono response is needed to compute final result which is written after declared Mono logic.

if (SomeObject.getAccountLevelActiveList() != null) {

                SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -> {
                    Mono<SubLine> subLineMono= SubLineService
                            .getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account ));
                

                    subLineMono.subscribe(subLine-> PollObject.getSubList()
                            .put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));

                });

            }

But my main logic is getting executed before mono result stored to the PollObject. so i'm getting null in the PollObject. So i want to stop my main thread until Mono results stored into the PollObject.


Solution

  • If you want to stop main thread then you can use blocking instead of subscribing, but you must first convert the List into Flux and then flatMap-it using provided Mono. The logic you have in the subscribe method can be moved into a side effect operator doOnNext either of the Mono or the wrapping Flux:

    Flux.fromIterable(SomeObject.getAccountLevelActiveList())
        .flatMap(account ->
            SubLineService.getLineLevelCustProfile(
                preNbsLineLevelConverter.getSubLine( account ))
        ).doOnNext(subLine ->
            PollObject.getSubList().put(accountLevelMtn.getMtn(),
                Optional.ofNullable(subLine))
        ).blockLast();
        // the following code will be executed first when all monos are completed
    

    If your code following the if is not required to run in the main thread, it would be better stay reactive as the @chrylis-cautiouslyoptimistic already suggested. Use reduce operator to put all the result together producing mono that is completed when all the provided monos are completed:

    Flux.fromIterable(SomeObject.getAccountLevelActiveList())
        .flatMap(account ->
            SubLineService.getLineLevelCustProfile(
                preNbsLineLevelConverter.getSubLine( account ))
        ).reduce(PollObject.getSubList(), (subList, subLine) ->
            sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
        ).map(subList -> {
            // the code here will be executed first when all monos are completed
        })
        // ... other operators if necessary
        // eventually subscribing or returning the mono for further processing 
        .subscribe();