rubyapache-kafkakarafka

Karafka start_from_beginning not working as expected


I've created a project to help with my understanding of Kafka. It's set up as three identical Rails apps all inside Docker with Karafka configured to consume the messages - if you create a record in one, it's replicated across to the other two. I assumed that the start_from_beginning setting would mean that every time the Karafka server was restarted it would start from offset 0, but that does seem to be the case. Can someone please explain what I've done wrong or correct my understanding.

Here are the two significant sections from karafka.rb

  setup do |config|
    config.kafka.seed_brokers = %w[kafka://kafka:9092]
    config.client_id = "app_#{ ENV['APP_ID'] }"
    config.logger = Rails.logger
  end

...

  consumer_groups.draw do
    topic :party do
      consumer PartyConsumer
      start_from_beginning true
    end
  end

I have already tried putting config.kafka.start_from_beginning = true in the config section of karafka.rb but no joy.

When I create a record in one of the apps, it's sync over to the other two. This is what I was trying to do:

At this point, I was expecting the database to be re-created from Kafka by rewinding to offset 0 and replaying all the messages. What have I missed?

The full project is here: https://github.com/jcleary/kafka-demo


Solution

  • The issue was in fact with my understanding of start_from_beginning, the purpose of which is to decide how new consumers should behave. Existing consumers are always expected to pick up from where they left off.

    I found two ways to achieve what I was looking for:

    1 - Using Kafka's seek function (recommended):

    From the wiki: https://github.com/karafka/karafka/wiki/Events-monitoring-and-logging#using-the-connectionlistenerbefore_fetch_loop-for-topic-seeking

    2 - Changing the client_id in karafka.rb, e.g.

    class KarafkaApp < Karafka::App
      setup do |config|
        config.kafka.seed_brokers = %w[kafka://kafka:9092]
        config.client_id = "app_#{ ENV['APP_ID'] }-#{ SecureRandom.uuid }"
        config.logger = Rails.logger
      end
      ... 
    

    Using SecureRandom.uuid will ensure the your consumer has a unique client_id and therefore reprocess all messages.

    Option 2 is more of a hack, but depending on your use case, might be what you're looking for.