project-reactorspring-data-mongodb-reactive

process reactive query result concurrently


I need to iterate over a massive query result and perform some business logic on each entity. The database is connected with Spring Data Mongo Reactive so I get a Flux<Entity>.
The simple implementation would be:

Query query = new Query();
query.withReadPreference(ReadPreference.secondary());

reactiveMongoTemplate.find(query, Entity.class)
    .flatMap(this::processEntity)
    .doOnError(e -> log.error("there was an error", e))
    .doOnComplete(() -> doSomethingWhenEverythingIsDone())
    .subscribe();

But this would take too much time. I can process each entity individually so why not process them concurrently? But I cant get that to work properly, its not using more resources nor increasing processing speed.

I tried using a parallelFlux with runOn Schedulers.paralllel().
Current implementation:

Query query = new Query();
    query.cursorBatchSize(properties.getDatabaseCursorBatchsizePerThread() * properties.getParallelism());
        query.withReadPreference(ReadPreference.secondary());

reactiveMongoTemplate.find(query, Entity.class)
    .parallel(properties.getParallelism(), properties.getDatabaseCursorBatchsizePerThread())
    .runOn(Schedulers.parallel())
    .flatMap(this::processEntity)
    .doOnError(e -> log.error("there was an error", e))
    .sequential()
    .doOnComplete(() -> doSomethingWhenEverythingIsDone())
    .subscribe();

I could not get any performance improvements from this code.
Im familiar with project reactor but new to parallel compute with the framework.
I suspect that Im waiting for something that I dont want to wait for.
Can you help me fix the concurrency?
Im not sure what the right approach is for this case.
Im neither database nor network limited, Im monitoring those closely.

Thank you for the help

PS: I dont use the return value of the processEntity method, instead I just flatMap to Mono.empty(). I dont use a doOnNext because the consumer is notified first, which causes issues if you expect the processing of all doOnNext to be completed. The flatMap fixed this issue for me.


Solution

  • Update #2: OP experiences some issues with method processEntity which is not shown in the question. Moved my version of processEntity into a separate, non-reactive method.

    Update: OP informed in a comment that the return value from processEntity is not of interest, therefore I changed return value to Mono<Void> in example code, and changed to Mono.fromRunnable.

    Assuming that processEntity is using Mono.fromCallable or Mono.fromRunnable, .subscribeOn(Schedulers.boundedElastic()) makes the execution go parallel.

    Example output with the below code

    15:27:17.914 [boundedElastic-2] INFO  FluxTest -- Start processing number: 2
    15:27:17.914 [boundedElastic-1] INFO  FluxTest -- Start processing number: 1
    15:27:17.914 [boundedElastic-3] INFO  FluxTest -- Start processing number: 3
    15:27:18.919 [boundedElastic-2] INFO  FluxTest -- Completed processing number: 2
    15:27:18.919 [boundedElastic-3] INFO  FluxTest -- Completed processing number: 3
    15:27:18.919 [boundedElastic-1] INFO  FluxTest -- Completed processing number: 1
    15:27:18.919 [boundedElastic-1] INFO  FluxTest -- Done!
    
    

    Test

    public class ReactiveTest {
    
        private static final Logger log = LoggerFactory.getLogger(ReactiveTest.class);
    
        // processEntity should be non-reactive
        void processEntity(int number) {
            log.info("Start processing number: {}", number);
            try {
                Thread.sleep(1000); // Emulate long-running process
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.info("Completed processing number: {}", number);
        }
    
        // wrap call to processEntity in Mono.fromRunnable
        public Mono<Void> fromRunnableWrapper(int number) {
            Mono<Void> runnable = Mono.fromRunnable(() -> processEntity(number));
            return runnable.subscribeOn(Schedulers.boundedElastic());
        }
    
        void doSomethingWhenEverythingIsDone() {
            log.info("Done!");
        }
    
        @Test
        public void test() {
            var flux = Flux.just(1, 2, 3)
                    .flatMap(this::fromRunnableWrapper)
                    .doOnError(e -> log.error("there was an error", e))
                    .doOnComplete(this::doSomethingWhenEverythingIsDone);
    
            StepVerifier.create(flux)
                    .verifyComplete();
        }
    }