I have a fairly large dataframe(million rows), and the requirement is to store each of the row in a separate json file.
For this data frame
root
|-- uniqueID: string
|-- moreData: array
The output should be stored like below for all the rows.
s3://.../folder[i]/<uniqueID>.json
where i is the first letter of the uniqueID
I have looked at other questions and solutions, but they don't satisfy my requirements. Trying to do this in a more time optimized way, and from what I have read so far re-partition is not a good option.
Tried writing the df with maxRecordsPerFile
option, but I can't seem to control the naming of the files.
df.write.mode("overwrite")
.option("maxRecordsPerFile", 1)
.json(outputPath)
I am fairly new to spark, any help is much appreciated.
I don't think there is really an optimized (if we take that to mean "much faster than any other") method of doing this. It's fundamentally an inefficient operation, and one that I can't really see a good use case for. But, assuming you really have thought this through and decided this is the best way to solve the problem at hand, I would suggest you reconsider using the repartition
method on the dataframe; it can take a column to be used as the partitioning expression. The only thing it won't do is split files across directories the way you want.
I suppose something like this might work:
import java.io.File
import scala.reflect.io.Directory
// dummy data
val df = Seq(("A", "B", "XC"), ("D", "E", "YF"), ("G", "H", "ZI"), ("J", "K", "ZL"), ("M", "N", "XO")).toDF("FOO", "BAR", "BAZ")
// List of all possible prefixes for the index column. If you need to generate this
// from the data, replace this with a query against the input dataframe to do that.
val prefixes = List("X", "Y", "Z")
// replace with your path
val path = "/.../data"
prefixes.foreach{p =>
val data = df.filter(col("uniqueID").startsWith(p))
val path = new Directory(new File(f"$path/$p"))
data.repartition(data.count.toInt) // repartition the dataframe with 1 record per partition
data.write.format("json").save(path)
}
The above doesn't quite meet the requirement since you can't control the output file name1. We can use a shell script to
fix the file names afterward. This assumes you are running in an environment with bash
and jq
available.
#!/usr/bin/env bash
# replace with the path that contains the directories to process
cd /.../data
for sub_data_dir in ./*; do
cd "${sub_data_dir}"
rm _SUCCESS
for f in ./part-*.json; do
uuid="$(jq -r ."uniqueID" "${f}")"
mv "${f}" "${uuid}"
done
cd ..
done
1: Spark doesnt give you an option to control individual file names when using Dataframe.write
because that isn't how it is meant to be used. The intended usage is on a multi-node Hadoop cluster where data may be distributed arbitrarily between the nodes. The write
operation is coordinated among all nodes and targets a path on the shared HDFS. In that case it makes no sense to talk about individual files because the operation is performed on the dataframe level, and so you can only control the naming of the directory where the output files will be written (as the argument to the save
method)