csvapache-sparkapache-spark-sqlapache-spark-mllibspark-csv

Inconsistent behaviour when attempting to write Dataframe to CSV in Apache Spark


I'm trying to output the optimal hyperparameters for a decision tree classifier I trained using Spark's MLlib to a csv file using Dataframes and spark-csv. Here's a snippet of my code:

// Split the data into training and test sets (10% held out for testing)
val Array(trainingData, testData) = assembledData.randomSplit(Array(0.9, 0.1))

// Define cross validation with a hyperparameter grid
val crossval = new CrossValidator()
    .setEstimator(classifier)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(new BinaryClassificationEvaluator)
    .setNumFolds(10)

// Train model
val model = crossval.fit(trainingData)

// Find best hyperparameter combination and create an RDD 
val bestModel = model.bestModel
val hyperparamList = new ListBuffer[(String, String)]()
bestModel.extractParamMap().toSeq.foreach(pair => {
    val hyperparam: Tuple2[String,String] = (pair.param.name,pair.value.toString)
    hyperparamList += hyperparam
})
val hyperparameters = sqlContext.sparkContext.parallelize(hyperparamList.toSeq)

// Print the best hyperparameters 
println(bestModel.extractParamMap().toSeq.foreach(pair => {
    println(s"${pair.param.parent} ${pair.param.name}")
    println(pair.value)
}))

// Define csv path to output results
var csvPath: String  = "/root/results/decision-tree"
val hyperparametersPath: String = csvPath+"/hyperparameters"
val hyperparametersFile: File = new File(hyperparametersPath)
val results = (hyperparameters, hyperparametersPath, hyperparametersFile)

// Convert RDD to Dataframe and write it as csv 
val dfToSave = spark.createDataFrame(results._1.map(x => Row(x._1, x._2)))
dfToSave.write.format("csv").mode("overwrite").save(results._2)

// Stop spark session
spark.stop()

After finishing a Spark job, I can see the part-00*... and _SUCCESS files inside the path as expected. However, though there are 13 hyperparameters total in this case (confirmed by printing them on screen), cat-ing the csv files shows not every hyperparameter was written to csv:

user@master:~$ cat /root/results/decision-tree/hyperparameters/part*.csv
checkpointInterval,10
featuresCol,features
maxDepth,5
minInstancesPerNode,1

Also, the hyperparameters that do get written change in every execution. This is executed on a HDFS-based Spark cluster with 1 master and 3 workers that have exactly the same hardware. Could it be a race condition? If so, how can I solve it?

Thanks in advance.


Solution

  • I think I figured it out. I expected dfTosave.write.format("csv")save(path) to write everything to the master node, but since the tasks are distributed to all workers, each worker saves its part of the hyperparameters to a local CSV in its filesystem. Because in my case the master node is also a worker, I can see its part of the hyperparameters. The "inconsistent behaviour" (i.e. seeing different parts in each execution) is caused by whatever algorithm Spark uses for distributing partitions among workers.

    My solution will be to collect the CSVs from all workers using something like scp or rsync to build the full results.