apache-sparkspark-streamingcheckpointing

Why does Spark throw "SparkException: DStream has not been initialized" when restoring from checkpoint?


I am restoring a stream from a HDFS checkpoint (ConstantInputDSTream for example) but I keep getting SparkException: <X> has not been initialized.

Is there something specific I need to do when restoring from checkpointing?

I can see that it wants DStream.zeroTime set but when the stream is restored zeroTime is null. It doesn't get restored possibly due to it being a private member IDK. I can see that the StreamingContext referenced by the restored stream does have a value for zeroTime.

initialize is a private method and gets called at StreamingContext.graph.start but not by StreamingContext.graph.restart, presumably because it expects zeroTime to have been persisted.

Does someone have an example of a Stream that recovers from a checkpoint and has a non null value for zeroTime?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)

Solution

  • The problem was that I created the dstreams after the StreamingContext had been recreated from checkpoint, i.e. after StreamingContext.getOrCreate. Creating dstreams and all transformations should've been in createStreamingContext.

    The issue was filled as [SPARK-13316] "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards.