I am restoring a stream from a HDFS checkpoint (ConstantInputDSTream for example) but I keep getting SparkException: <X> has not been initialized
.
Is there something specific I need to do when restoring from checkpointing?
I can see that it wants DStream.zeroTime
set but when the stream is restored zeroTime
is null
. It doesn't get restored possibly due to it being a private member IDK. I can see that the StreamingContext
referenced by the restored stream does have a value for zeroTime
.
initialize
is a private method and gets called at StreamingContext.graph.start
but not by StreamingContext.graph.restart
, presumably because it expects zeroTime
to have been persisted.
Does someone have an example of a Stream that recovers from a checkpoint and has a non null value for zeroTime
?
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Duration(1000))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
The problem was that I created the dstreams after the StreamingContext had been recreated from checkpoint, i.e. after StreamingContext.getOrCreate
. Creating dstreams and all transformations should've been in createStreamingContext
.
The issue was filled as [SPARK-13316] "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards.