apache-kafkaapache-storm

Apache Storm with Kafka offset management


I have built a sample topology with Storm using Kafka as a source. Here is a problem for which I need a solution.

Every time I kill a topology and start it again, the topology starts processing from the beginning.

Suppose Message A in Topic X was processed by Topology and then I kill the topology.

Now when I again submit the topology and Message A is still there is Topic X. It is processed again.

Is there a solution, maybe some sort of offset management to handle this situation.


Solution

  • You shouldn't use storm-kafka for new code, it is deprecated since the underlying client API is deprecated in Kafka, and removed as of 2.0.0. Instead, use storm-kafka-client.

    With storm-kafka-client you want to set a group id, and a first poll offset strategy.

    KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
                .build();
    

    The above will make your spout start at the earliest offset first time you start it, and then it will pick up where it left off if you restart it. The group id is used by Kafka to recognize the spout when it restarts, so it can get the stored offset checkpoint back. Other offset strategies will behave differently, you can check the javadoc for the FirstPollOffsetStrategy enum.

    The spout will checkpoint how far it got periodically, there is also a setting in the config to control this. The checkpointing is controlled by the setProcessingGuarantee setting in the config, and can be set to have at-least-once (only checkpoint acked offsets), at-most-once (checkpoint before spout emits the message), and "any times" (checkpoint periodically, ignoring acks).

    Take a look at one of the example topologies included with Storm https://github.com/apache/storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L93.