I've created a project to help with my understanding of Kafka. It's set up as three identical Rails apps all inside Docker with Karafka configured to consume the messages - if you create a record in one, it's replicated across to the other two. I assumed that the start_from_beginning
setting would mean that every time the Karafka server was restarted it would start from offset 0, but that does seem to be the case. Can someone please explain what I've done wrong or correct my understanding.
Here are the two significant sections from karafka.rb
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }"
config.logger = Rails.logger
end
...
consumer_groups.draw do
topic :party do
consumer PartyConsumer
start_from_beginning true
end
end
I have already tried putting config.kafka.start_from_beginning = true
in the config section of karafka.rb
but no joy.
When I create a record in one of the apps, it's sync over to the other two. This is what I was trying to do:
with start_from_beginning = true
)At this point, I was expecting the database to be re-created from Kafka by rewinding to offset 0 and replaying all the messages. What have I missed?
The full project is here: https://github.com/jcleary/kafka-demo
The issue was in fact with my understanding of start_from_beginning
, the purpose of which is to decide how new consumers should behave. Existing consumers are always expected to pick up from where they left off.
I found two ways to achieve what I was looking for:
1 - Using Kafka's seek function (recommended):
From the wiki: https://github.com/karafka/karafka/wiki/Events-monitoring-and-logging#using-the-connectionlistenerbefore_fetch_loop-for-topic-seeking
2 - Changing the client_id
in karafka.rb
, e.g.
class KarafkaApp < Karafka::App
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }-#{ SecureRandom.uuid }"
config.logger = Rails.logger
end
...
Using SecureRandom.uuid
will ensure the your consumer has a unique client_id
and therefore reprocess all messages.
Option 2 is more of a hack, but depending on your use case, might be what you're looking for.