I am using Akka Streams Kafka and I am looking for a way to do the following:
x
x
, x+1
, x+2
.. until the last itemCode would look something like
Consumer
.plainSource(consumerSettings, subscription)
.runForeach(println("got record!"))
.onComplete {
case Success(_) => // all items read
case Failure(error) => // error
}
and it would complete after the last element has been read. Maybe this is not the way this library is intended to be used. How can I achieve this?
Akka Consumer works in a "pulling" fashion, it will be alive forever unless errors connecting with the broker happen. But, when do you consider that the stream is over?. Kafka can be considered as a distributed log from where you read messages given a offset. As long as your client is connected to the Broker your client will be up and running... If you consider your stream termination when no events come from Kafka for a time interval(for example) you could use idleTimeout:
Consumer
.plainSource(consumerSettings, subscription)
.idleTimeout(10 seconds)
.runForeach(e => println("E"))
.onComplete {
case Success(_) => // all items read
case Failure(error) =>
// TimeoutException if no element in ten seconds the stream stops throwing this exception
}
Another possibility could be using a Fan-In stage, specifically MergePreferred. We can create another Tick Source which emits events in a time interval. The Kafka source will have preference, so as far as elements come from Kafka the stage always will pull elements from this source. If no elements in some interval a "Timeout" string will be pushed downstream. Something like:
implicit val actorSystem = ActorSystem("test-actor-system")
implicit val streamMaterializer = ActorMaterializer()
implicit val ec = actorSystem.dispatcher
val consumer =
Consumer
.plainSource(consumerSettings, subscription)
.map(_.value())
val tick = Source.tick(50 millis, 30 seconds, "Timeout")
val source = GraphDSL.create(consumer, tick)(Keep.both) { implicit b ⇒
(r1, r2) ⇒
val merge = b.add(MergePreferred[String](1, false))
r2 ~> merge.in(0)
r1 ~> merge.preferred
SourceShape(merge.out)
}
Source
.fromGraph(source)
.takeWhile(el => el != "Timeout")
.runForeach(msg => println(msg))
.onComplete{
case Success(_) => println("Stream ended")
case Failure(error) => println("There was an error")
}
With takeWhile the stream will be active meanwhile there are elements from Kafka.
This is only one approach. Akka Stream has many different stages and the Graph Api to face these situations perhaps in a more elegant way.