The question is not specific to database, but depicted it with the database example
This works with below
reactiveTemplate
.query(cql, itemMapper) // This returns an item object
.doOnNext(this::saveRecord) //this save item Object
.doOnError(this::processErrorRecord); // Logs the exception
But If I want to log which item got the error, how to access failedIte in doOnError
method?
reactiveTemplate
.query(cql, itemMapper) // This returns an item object
.doOnNext(this::saveRecord) //this save item Object
.doOnError(th -> processErrorRecord(<failed Item>,th)); //how to access the item which got error
It's hard to suggest something without knowing all details but here are your options. doOnNext
is an operator typically used for "side-effect" logic such as logging, metrics, ... to execute non-reactive code. I'm not really sure why saveRecord
is not reactive because it looks like you are using reactiveTemplate
to query data.
In case saveRecord
is using reactiveTemplate
you should return Mono<Void>
(not void
). In this case you can apply the following
reactiveTemplate
.query(cql, itemMapper) // This returns an item object
.flatMap(record ->
saveRecord(record) //this save item Object
.doOnError(th -> {
log.error("error while saving record: {}", record);
processErrorRecord(record, th);
})
);
Not sure what is the logic of processErrorRecord
but in case this is reactive, use onErrorResume
instead of doOnError
.
In case saveRecord
is not reactive and blocking, you need to execute it on a separate Scheduler
. Check How Do I Wrap a Synchronous, Blocking Call? for details
reactiveTemplate
.query(cql, itemMapper) // This returns an item object
.flatMap(record ->
Mono.fromRunnable(() -> saveRecord(record))
.subscribeOn(Schedulers.boundedElastic())
.doOnError(th -> {
log.error("error while saving record: {}",record)
processErrorRecord(record, th);
})
);