I use the Alpakka Kafka connector for running a Consumer stream inside akka streams. The stream starts streams inside for each partition that is assigned to this consumer instance. That means if there is any error inside a partition stream, then I need to restart either the partition or everything.
The code is following:
Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
.mapAsyncUnordered(
maxPartitions,
pair -> {
Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
pair.second();
return source
.via(businessThatMayCrash())
.map(message -> message.committableOffset())
.runWith(Committer.sink(committerSettings), system);
})
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(system);
See Separate streams per partition https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition
But how I can restart the streams? Please note: If there is any error on Kafka side, then it resumes automatically.
I tried to use the RestartSource for each partition but if the stream is failed then the Kafka consume does not work anymore.
https://doc.akka.io/docs/akka/current/stream/operators/RestartSource/onFailuresWithBackoff.html
Also control.streamCompletion()
does not get completed.
There is also the option to add a watchTermination()
for each partition that calls control.drainAndShutdown(..)
but that seems quite complicated.
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html
What is the best solution for that?
The trick will be to keep the failure of the stream you're running inside mapAsyncUnordered
from failing the mapAsyncUnordered
.
The easiest way to do this is to construct a CompletableFuture
which will succeed when the stream succeeds or fails.
// inside mapAsyncUnordered, apologies for any Java atrocities (Scala dev...)
pair -> {
return
pair.second()
.via(businessLogicThatMayCrash())
.map(message -> message.committableOffset())
.runWith(Committer.sink(committerSettings), system)
.handle((ignoredResult, ignoredFailure) -> akka.Done.done());
}
When the stream being run fails, a new source should be picked up by the committablePartitionedSource
.