I need to iterate over a massive query result and perform some business logic on each entity. The database is connected with Spring Data Mongo Reactive so I get a Flux<Entity>.
The simple implementation would be:
Query query = new Query();
query.withReadPreference(ReadPreference.secondary());
reactiveMongoTemplate.find(query, Entity.class)
.flatMap(this::processEntity)
.doOnError(e -> log.error("there was an error", e))
.doOnComplete(() -> doSomethingWhenEverythingIsDone())
.subscribe();
But this would take too much time. I can process each entity individually so why not process them concurrently? But I cant get that to work properly, its not using more resources nor increasing processing speed.
I tried using a parallelFlux with runOn Schedulers.paralllel().
Current implementation:
Query query = new Query();
query.cursorBatchSize(properties.getDatabaseCursorBatchsizePerThread() * properties.getParallelism());
query.withReadPreference(ReadPreference.secondary());
reactiveMongoTemplate.find(query, Entity.class)
.parallel(properties.getParallelism(), properties.getDatabaseCursorBatchsizePerThread())
.runOn(Schedulers.parallel())
.flatMap(this::processEntity)
.doOnError(e -> log.error("there was an error", e))
.sequential()
.doOnComplete(() -> doSomethingWhenEverythingIsDone())
.subscribe();
I could not get any performance improvements from this code.
Im familiar with project reactor but new to parallel compute with the framework.
I suspect that Im waiting for something that I dont want to wait for.
Can you help me fix the concurrency?
Im not sure what the right approach is for this case.
Im neither database nor network limited, Im monitoring those closely.
Thank you for the help
PS: I dont use the return value of the processEntity method, instead I just flatMap to Mono.empty(). I dont use a doOnNext because the consumer is notified first, which causes issues if you expect the processing of all doOnNext to be completed. The flatMap fixed this issue for me.
Update #2: OP experiences some issues with method processEntity
which is not shown in the question. Moved my version of processEntity
into a separate, non-reactive method.
Update: OP informed in a comment that the return value from processEntity
is not of interest, therefore I changed return value to Mono<Void>
in example code, and changed to Mono.fromRunnable
.
Assuming that processEntity
is using Mono.fromCallable
or Mono.fromRunnable
, .subscribeOn(Schedulers.boundedElastic())
makes the execution go parallel.
Example output with the below code
15:27:17.914 [boundedElastic-2] INFO FluxTest -- Start processing number: 2
15:27:17.914 [boundedElastic-1] INFO FluxTest -- Start processing number: 1
15:27:17.914 [boundedElastic-3] INFO FluxTest -- Start processing number: 3
15:27:18.919 [boundedElastic-2] INFO FluxTest -- Completed processing number: 2
15:27:18.919 [boundedElastic-3] INFO FluxTest -- Completed processing number: 3
15:27:18.919 [boundedElastic-1] INFO FluxTest -- Completed processing number: 1
15:27:18.919 [boundedElastic-1] INFO FluxTest -- Done!
Test
public class ReactiveTest {
private static final Logger log = LoggerFactory.getLogger(ReactiveTest.class);
// processEntity should be non-reactive
void processEntity(int number) {
log.info("Start processing number: {}", number);
try {
Thread.sleep(1000); // Emulate long-running process
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("Completed processing number: {}", number);
}
// wrap call to processEntity in Mono.fromRunnable
public Mono<Void> fromRunnableWrapper(int number) {
Mono<Void> runnable = Mono.fromRunnable(() -> processEntity(number));
return runnable.subscribeOn(Schedulers.boundedElastic());
}
void doSomethingWhenEverythingIsDone() {
log.info("Done!");
}
@Test
public void test() {
var flux = Flux.just(1, 2, 3)
.flatMap(this::fromRunnableWrapper)
.doOnError(e -> log.error("there was an error", e))
.doOnComplete(this::doSomethingWhenEverythingIsDone);
StepVerifier.create(flux)
.verifyComplete();
}
}