scalaapache-sparkapache-kafka

kafka offset in spark


Kafka enable.auto.commit is set to false and Spark version is 2.4

  1. If using latest offset, do we need to manually find last offset details and mention it in .CreateDirectStream() in Spark application? or will it automatically take latest offset? In any case do we need to find the last offset details manually.

  2. Is there any difference when use SparkSession.readstrem.format(kafka).... and KafkaUtils.createDirectStream()?

  3. When using earliest offset option, will it consider the offset automatically?


Solution

  • Here is my attempt to answer your questions

    1. Ques 1: enable.auto.commit is a kafka related parameter and if set to false requires you to manually commit (read update) your offset to the checkpoint directory. If your application restarts it will look into the checkpoint directory and start reading from last committed offset + 1. same is mentioned here https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-properties-enable-auto-commit.html by jaceklaskowski. There is no need to specify the offset anywhere as part of your spark application. All you need is the checkpoint directory. Also, remember offsets are maintained per partition in topic per consumer so it would be bad on Spark to expect developers/users to provide that.
    2. Ques 2: spark.readStream is a generic method to read data from streaming sources such as tcp socket, kafka topics etc while kafkaUtils is a dedicated class for integration of spark with kafka so I assume it is more optimised if you are using kafka topics as source. I usually use KafkaUtils on my own through I haven't done any performance benchmarks. If I am not wrong, KafkaUtils can be used to subscribe to more than 1 topic as well while readStream cannot be.
    3. Ques 3: earliest offset means your consumer will start reading from the oldest record available for example, if your topic is new (no clean up has happened) or cleanup is not configured for the topic it will start from offset 0. in case cleanup is configured and all records till offset 2000 have been removed, records will be read from offset 2001 while the topic may have records till offset 10000 ( this is assuming there is only one partition, in topics will multiple partitions the offset value will be different ). See section for batch queries here https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html for more details.