apache-kafkaerror-handlingakkaalpakka

Akka Kafka restart on internal failure


I use the Alpakka Kafka connector for running a Consumer stream inside akka streams. The stream starts streams inside for each partition that is assigned to this consumer instance. That means if there is any error inside a partition stream, then I need to restart either the partition or everything.

The code is following:

Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsyncUnordered(
        maxPartitions,
        pair -> {
          Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
              pair.second();
          return source
              .via(businessThatMayCrash())
              .map(message -> message.committableOffset())
              .runWith(Committer.sink(committerSettings), system);
        })
    .toMat(Sink.ignore(), Consumer::createDrainingControl)
    .run(system);

See Separate streams per partition https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#source-per-partition

But how I can restart the streams? Please note: If there is any error on Kafka side, then it resumes automatically.

I tried to use the RestartSource for each partition but if the stream is failed then the Kafka consume does not work anymore. https://doc.akka.io/docs/akka/current/stream/operators/RestartSource/onFailuresWithBackoff.html Also control.streamCompletion() does not get completed.

There is also the option to add a watchTermination() for each partition that calls control.drainAndShutdown(..) but that seems quite complicated. https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/watchTermination.html

What is the best solution for that?


Solution

  • The trick will be to keep the failure of the stream you're running inside mapAsyncUnordered from failing the mapAsyncUnordered.

    The easiest way to do this is to construct a CompletableFuture which will succeed when the stream succeeds or fails.

    // inside mapAsyncUnordered, apologies for any Java atrocities (Scala dev...)
    pair -> {
      return
         pair.second()
            .via(businessLogicThatMayCrash())
            .map(message -> message.committableOffset())
            .runWith(Committer.sink(committerSettings), system)
            .handle((ignoredResult, ignoredFailure) -> akka.Done.done());
    }
    

    When the stream being run fails, a new source should be picked up by the committablePartitionedSource.