scalaakkaapache-kafkaakka-streamakka-kafka

How do you return a future containing a list of messages after all available messages have been consumed from a Kafka topic?


I am probably missing the point of the Kafka Consumer but what I want to do is:

Consumer subscribes to a topic, grabs all messages within the topic and returns a Future with a list of all of those messages

The code I have written to try and accomplish this is

val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}

def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
  .map { message =>
    logger.info(s"Consuming ${message.record.value}")
    KafkaMessage(Some(message.record.key()), Some(message.record.value()))
  }
  .buffer(bufferSize, overflowStrategy)
  .runWith(sink)

The Future never returns though, it consumes the necessary messages and then continues to poll the topic repeatedly. Is there a way to return the Future and then close the consumer?


Solution

  • As Kafka is for streaming data, there is no such thing as "all messages" as new data can be appended to a topic at any point.

    I guess, there are two possible things you could do:

    1. check how many records got returned by the last poll and terminate or
    2. you would need to get "current end of log" via endOffsets, and compare this to the offset of the latest record per partition. If both match, then you can return.

    The first approach is simpler, but might have the disadvantage, that it's not as reliable as the second approach. Theoretically, a poll could return zero records, even if there are records available (even if the chances are not very high that this happens).

    Not sure, how to express this termination condition in Scala though (as I am not very familiar with Scala).