I have a Spark job, written in Scala, that fails occasionally with this message:
org.apache.spark.SparkException: Job aborted due to stage failure:
A shuffle map stage with indeterminate output was failed and retried.
However, Spark cannot rollback the ResultStage 20 to re-process the input data,
and has to fail this job. Please eliminate the indeterminacy by checkpointing
the RDD before repartition and try again.
The code looks like this:
val rdd = getRDD(...)
val output = rdd.repartition(NumPartitions).mapPartitions { ... }.collect
I wasn't really familiar with checkpointing, but found some instances of it elsewhere in my company's codebase, and copied it into my own job:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
val rdd = getRDD(...)
rdd.checkpoint()
val output = rdd.repartition(NumPartitions).mapPartitions { ... }.collect
However, the job still intermittently fails with the exact same message. The stack trace points to the line with repartition
in it, and in any case the job contains no other repartition
calls.
Anyone have a clue how I can be getting the same error even after adding a checkpoint?
Review your transformations with Explain Plan to ensure they are deterministic. Avoid operations that rely on random number generation or system time, as these can introduce indeterminacy.