How can you write to multiple outputs dependent on the key using Spark in a single Job.
Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job
E.g.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
would ensure cat prefix/1
is
a
b
and cat prefix/2
would be
c
NOTE: If you are new to Scala or Spark, this answer has the correct Scala style and compiles. The top voted answer is still very good and please do upvote it, but please just note that it isn't idiomatic Scala
This includes the codec as requested, necessary imports, and pimp as requested.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
def writeAsMultiple(prefix: String, codec: String,
keyName: String = "key")
(implicit sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
rdd.toDF(keyName, "_2").write.partitionBy(keyName)
.format("text").option("codec", codec).save(prefix)
}
}
val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
One subtle difference to the OP is that it will prefix <keyName>=
to the directory names. E.g.
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
Would give:
prefix/key=1/part-00000
prefix/key=2/part-00000
where prefix/my_number=1/part-00000
would contain the lines a
and b
, and prefix/my_number=2/part-00000
would contain the line c
.
And
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
Would give:
prefix/foo=1/part-00000
prefix/foo=2/part-00000
It should be clear how to edit for parquet
.
Finally below is an example for Dataset
, which is perhaps nicer that using Tuples.
implicit class PimpedDataset[T](dataset: Dataset[T]) {
def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
dataset.write.partitionBy(field)
.format("text").option("codec", codec).save(prefix)
}
}