I am probably missing the point of the Kafka Consumer but what I want to do is:
Consumer subscribes to a topic, grabs all messages within the topic and returns a Future with a list of all of those messages
The code I have written to try and accomplish this is
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
The Future never returns though, it consumes the necessary messages and then continues to poll the topic repeatedly. Is there a way to return the Future and then close the consumer?
As Kafka is for streaming data, there is no such thing as "all messages" as new data can be appended to a topic at any point.
I guess, there are two possible things you could do:
poll
and terminate orendOffsets
, and compare this to the offset of the latest record per partition. If both match, then you can return.The first approach is simpler, but might have the disadvantage, that it's not as reliable as the second approach. Theoretically, a poll could return zero records, even if there are records available (even if the chances are not very high that this happens).
Not sure, how to express this termination condition in Scala though (as I am not very familiar with Scala).