apache-sparkspark-structured-streamingspark-kafka-integration

Structured Streaming startingOffest and Checkpoint


I am confused about startingOffsets in structured streaming. In the official docs here, it says query type

  1. Streaming - is this continuous streaming?
  2. Batch - is this for query with forEachBatch or triggers? (latest is not allowed)

My workflow also has checkpoints enabled. How does this work together with startingOffsets? If my workflow crashes and I have startingOffsets as latest, does spark check kafka offset or the spark checkpoint offset or both?


Solution

  • Streaming by default means "micro-batch" in Spark. Dependent on the trigger you set, it will check the source for new data on the given frequency. You can use it by

    val df = spark
      .readStream
      .format("kafka")
      .[...]
    

    For Kafka there is also the experimental continuous trigger that allows processing of the data with quite a low latency. See the section Continuous Processing in the documentation.

    Batch on the other hand works like reading a text file (such as csv) that you do once. You can do this by using

    val df = spark
      .read
      .format("kafka")
      .[...]
    

    Note the difference in readStream for streaming and read for batch processing. In batch mode the startingOffset can only be set to earliest and even if you use checkpointing it will always start from earliest offset in case of a planned or unplanned restart.

    Checkpointing in Structured Streaming needs to be set in the writeStream part and needs to be unique for every single query (in case you are running multiple streaming queries from the same source). If you have that checkpoint location set up and you restart your application Spark will only look into those checkpoint files. Only when the query gets started for the very first time it checks the startingOffset option.

    Remember that Structured Streaming never commits any offsets back to Kafka. It only relies on its checkpoint files. See my other answer on How to manually set group.id and commit kafka offsets in spark structured streaming?.

    In case you plan to run your application, say, once a day it is therefore best to use readStream with checkpointing enabled and the trigger writeStream.trigger(Trigger.Once). A good explanation for this approach is given in a Databricks blog on Running Streaming Jobs Once a Day For 10x Cost Savings.