scalaapache-kafkaakka-streamreactive-kafka

How to 'Chunk and Re-assmble' large messages in Reactive Kafka using Akka-Stream


When sending a large file using Kafka, is it possible to distribute it across partitions and then re-assemble it using Akka-Stream? as described in this presentation:

http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297


Solution

  • The "chunking" side, i.e. the producer, is easy enough to write using something like reactive kafka:

    case class LargeMessage(bytes : Seq[Byte], topic : String)
    
    def messageToKafka(message : LargeMessage, maxMessageSize : Int) = 
      Source.fromIterator(() => message.bytes.toIterator)
            .via(Flow[Byte].grouped(maxMessageSize))
            .via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
            .runWith(Producer.plainSink(producerSettings)
    

    The "re-assembling", i.e. the consumer, can be implemented in a manner similar to the documentation:

       val messageFut : Future[LargeMessage] = 
         for {
           bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
         } yield LargeMessage(bytes, topic)