I have asked this question already on Google Groups, but have not received any response yet. So posting this here for a different audience.
We are using Reactive-Kafka for our application. We have a scenario as below, where we want to stop sending the messages to the consumer if any exception occurred while processing the message. The message should be retried after stipulated time or on explicit request from the consumer side. With our current approach, lets say, if the database of the consumer is down for sometime, it will still try to read from kafka and process the messages, but processing fails due to DB issues. This will keep the application busy unnecessarily. Instead of this, we want to pause the consumer to receive the messages for a stipulated time (say, wait for 30min to retry). We are not sure how to handle that case.
Is it possible to do the same ? Am I missing something?
Here is the sample code taken from the reactive kafka :
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset).recover {
case ex => {
/**
* HOW TO DO ????????
* On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time
* or on demand from the last committed offset
*/
throw ex
}
}
}
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
batch.updated(elem)
}
.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
There is a recoverWithRetries
combinator for this purpose. For reference, see this answer and the docs.
You could extract your source
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset)
and then do
src
.recoverWithRetries(attempts = -1, {case e: MyDatabaseException =>
logger.error(e)
src.delay(30.minutes, DelayOverflowStrategy.backpressure)})
...
(Retrying with attempts=-1 means retrying indefinitely)