apache-sparkapache-kafkaapache-spark-sqloffsetspark-structured-streaming

How to get Kafka offsets for structured query for manual and reliable offset management?


Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint directory to store offsets and guarantee an "exactly-once" message delivery.

But old docks (like https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable. As a solution, there is a practice to support storing offsets in external storage that supports transactions like MySQL or RedshiftDB.

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?

Previously, it can be done by casting RDD to HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

But with new Streaming API, I have an Dataset of InternalRow and I can't find an easy way to fetch offsets. The Sink API has only addBatch(batchId: Long, data: DataFrame) method and how can I suppose to get an offset for given batch id?


Solution

  • Relevant Spark DEV mailing list discussion thread is here.

    Summary from it:

    Spark Streaming will support getting offsets in future versions (> 2.2.0). JIRA ticket to follow - https://issues-test.apache.org/jira/browse/SPARK-18258

    For Spark <= 2.2.0, you can get offsets for the given batch by reading a json from checkpoint directory (the API is not stable, so be cautious):

    val checkpointRoot = // read 'checkpointLocation' from custom sink params
    val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
    val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)
    
    val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map { endOffset =>
      endOffset.offsets.filter(_.isDefined).map { str =>
        JsonUtilsWrapper.jsonToOffsets(str.get.json)
      }
    }
    
    
    /**
      * Hack to access private API
      * Put this class into org.apache.spark.sql.kafka010 package
      */
    object JsonUtilsWrapper {
      def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = {
        JsonUtils.partitionOffsets(partitionOffsets)
      }
    
      def jsonToOffsets(str: String): Map[TopicPartition, Long] = {
        JsonUtils.partitionOffsets(str)
      }
    }
    

    This endOffset will contain the until offset for each topic/partition. Getting the start offsets is problematic, cause you have to read the 'commit' checkpoint dir. But usually, you don't care about start offsets, because storing end offsets is enough for reliable Spark job re-start.

    Please, note that you have to store the processed batch id in your storage as well. Spark can re-run failed batch with the same batch id in some cases, so make sure to initialize a Custom Sink with latest processed batch id (which you should read from external storage) and ignore any batch with id < latestProcessedBatchId. Btw, batch id is not unique across queries, so you have to store batch id for each query separately.