I am new using akka streams kafka (and akka streams in general) . I am trying to construct a graph in order to publish a message to different topics.
How can I connect the producer as flow in order to commit the processed messages ? I tried using Producer.flow but I can't get the commitScaladsl
object TestFoo {
import akka.kafka.ProducerMessage.Message
implicit val system = ActorSystem("test-kafka")
implicit val materializer = ActorMaterializer()
val evenNumbersTopic = "even_numbers"
val allNumbersTopic = "all_numbers"
lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int])
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
lazy val source = Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic)))
val producerSettings = ProducerSettings(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers("localhost:9092")
val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import akka.stream.scaladsl.GraphDSL.Implicits._
type TypedMessage = Message[String, Int,CommittableOffset]
val bcast = b.add(Broadcast[TypedMessage](2))
val merge = b.add(Merge[TypedMessage](2))
val evenFilter = Flow[TypedMessage].filter ( c => c.record.value() % 2 == 0)
val justEven = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int]("general", pr.value())
Message(r, offset)
}
val allNumbers = Flow[TypedMessage].map{
case Message(pr, offset) =>
val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value())
Message(r, offset)
}
val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg =>
val r = new ProducerRecord[String, Int]("general", msg.record.value())
Message(r, msg.committableOffset)
}
source ~> toMsg ~> bcast
bcast ~> evenFilter ~> justEven ~> merge
bcast ~> allNumbers ~> merge
merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl()
}
ClosedShape
})}
Because you are using the GraphDSL, the compiler cannot infer the PassThrough
type from the previous stage.
Try and explicitly pass the type parameters to the Producer.flow
function, e.g.
merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result =>
result.message.passThrough.commitScaladsl()
}
I have left K
and V
as unbound param, please fit there whatever key/value types your Producer is bound to produce. If you want the code above to be correctly wired, you'll need to match the producerSettings
types with what comes from the merge stage. You'll need something like:
val producerSettings = ProducerSettings(system, new StringSerializer(), new JsonSerializer[Int])
.withBootstrapServers("localhost:9092")