scalahadoopoutputhdfsapache-spark

Write to multiple outputs by key Spark - one Spark job


How can you write to multiple outputs dependent on the key using Spark in a single Job.

Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job

E.g.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

would ensure cat prefix/1 is

a
b

and cat prefix/2 would be

c

NOTE: If you are new to Scala or Spark, this answer has the correct Scala style and compiles. The top voted answer is still very good and please do upvote it, but please just note that it isn't idiomatic Scala


Solution

  • This includes the codec as requested, necessary imports, and pimp as requested.

    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SQLContext
    
    // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
    implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
      def writeAsMultiple(prefix: String, codec: String,
                          keyName: String = "key")
                         (implicit sqlContext: SQLContext): Unit = {
        import sqlContext.implicits._
    
        rdd.toDF(keyName, "_2").write.partitionBy(keyName)
        .format("text").option("codec", codec).save(prefix)
      }
    }
    
    val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
    

    One subtle difference to the OP is that it will prefix <keyName>= to the directory names. E.g.

    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")
    

    Would give:

    prefix/key=1/part-00000
    prefix/key=2/part-00000
    

    where prefix/my_number=1/part-00000 would contain the lines a and b, and prefix/my_number=2/part-00000 would contain the line c.

    And

    myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")
    

    Would give:

    prefix/foo=1/part-00000
    prefix/foo=2/part-00000
    

    It should be clear how to edit for parquet.

    Finally below is an example for Dataset, which is perhaps nicer that using Tuples.

    implicit class PimpedDataset[T](dataset: Dataset[T]) {
      def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
        dataset.write.partitionBy(field)
        .format("text").option("codec", codec).save(prefix)
      }
    }