I have the following DataFrame input from a s3 file and need to transform the data into the following desired output. I am using Spark version 1.5.1 with Scala, but could change to Spark with Python. Any suggestions are welcome.
DataFrame Input:
name animal data
john mouse aaaaa
bob mouse bbbbb
bob mouse ccccc
bob dog ddddd
Desired Output:
john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv
terminal$ cat bob/mouse/file.csv
bbbbb
ccccc
terminal$ cat bob/dog/file.csv
ddddd
Here is my existing Spark Scala code that I have tried:
val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)
Current Output:
[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]
Some of the problems with my existing code is that the groupBy returns a GroupedData object and I probably don't want to do a count/sum/agg function on that data. I am looking for a better technique to group and output the data. The dataset is very large.
This can be achieved using partitionBy
option of the DataFrameWriter
. General syntax is as follows:
df.write.partitionBy("name", "animal").format(...).save(...)
Unfortunately the only plain text format which support partitioning in Spark 1.5 is JSON.
If you can update Spark installation to:
partitionBy
with text
format. 1.6 is also required if you need a single output file for group (repartition
).partitionBy
with csv
format.I believe that in 1.5 your best option is to write files as JSON and convert individual output files.
If number of distinct name', 'animals
is small you can try to perform separate write for each group:
val dist = df.select("name", "animal").rdd.collect.map {
case Row(name: String, animal: String) => (name, animal)
}
for {
(name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
.select($"data").write.format("csv").save(s"/prefix/$name/$animal")
but this won't scale when number of combinations grows.