scalaapache-sparkapache-spark-sqlaws-glue-spark

Writing each row in a spark dataframe to a separate json


I have a fairly large dataframe(million rows), and the requirement is to store each of the row in a separate json file.

For this data frame

 root
 |-- uniqueID: string 
 |-- moreData: array 

The output should be stored like below for all the rows.

s3://.../folder[i]/<uniqueID>.json

where i is the first letter of the uniqueID

I have looked at other questions and solutions, but they don't satisfy my requirements. Trying to do this in a more time optimized way, and from what I have read so far re-partition is not a good option.

Tried writing the df with maxRecordsPerFile option, but I can't seem to control the naming of the files.

df.write.mode("overwrite")
.option("maxRecordsPerFile", 1)
.json(outputPath)

I am fairly new to spark, any help is much appreciated.


Solution

  • I don't think there is really an optimized (if we take that to mean "much faster than any other") method of doing this. It's fundamentally an inefficient operation, and one that I can't really see a good use case for. But, assuming you really have thought this through and decided this is the best way to solve the problem at hand, I would suggest you reconsider using the repartition method on the dataframe; it can take a column to be used as the partitioning expression. The only thing it won't do is split files across directories the way you want.

    I suppose something like this might work:

    import java.io.File
    import scala.reflect.io.Directory
    
    // dummy data
    val df = Seq(("A", "B", "XC"), ("D", "E", "YF"), ("G", "H", "ZI"), ("J", "K", "ZL"), ("M", "N", "XO")).toDF("FOO", "BAR", "BAZ")
    
    // List of all possible prefixes for the index column. If you need to generate this
    // from the data, replace this with a query against the input dataframe to do that.
    val prefixes = List("X", "Y", "Z")
    
    // replace with your path
    val path = "/.../data"
    
    prefixes.foreach{p =>
      val data = df.filter(col("uniqueID").startsWith(p))
      val path = new Directory(new File(f"$path/$p"))
      data.repartition(data.count.toInt) // repartition the dataframe with 1 record per partition
      data.write.format("json").save(path)
    }
    

    The above doesn't quite meet the requirement since you can't control the output file name1. We can use a shell script to fix the file names afterward. This assumes you are running in an environment with bash and jq available.

    #!/usr/bin/env bash
    
    # replace with the path that contains the directories to process
    cd /.../data
    
    for sub_data_dir in ./*; do
      cd "${sub_data_dir}"
      rm _SUCCESS
      for f in ./part-*.json; do
        uuid="$(jq -r ."uniqueID" "${f}")"
        mv "${f}" "${uuid}"
      done
      cd ..
    done
    

    1: Spark doesnt give you an option to control individual file names when using Dataframe.write because that isn't how it is meant to be used. The intended usage is on a multi-node Hadoop cluster where data may be distributed arbitrarily between the nodes. The write operation is coordinated among all nodes and targets a path on the shared HDFS. In that case it makes no sense to talk about individual files because the operation is performed on the dataframe level, and so you can only control the naming of the directory where the output files will be written (as the argument to the save method)