scalaapache-sparkapache-spark-sqljson4s

How to convert Spark Dataframe to JSON using json4s, in Scala?


Trying to convert a dataframe to a JSON string and the output is just {}. Not sure what I'm doing wrong?

This is just a test but full Dataframe schema I need to use is 800+ columns so I don't want to have to specify each field specifically in the code if possible! Code runs in a locked down corporate environment so I can't write or read files to the system, has to be string output only.

import org.json4s.jackson.Serialization.write
import org.json4s.DefaultFormats

implicit val formats = DefaultFormats

val test = spark.sql("SELECT field1, field2, field3 FROM myTable LIMIT 2");

println("Output:");
write(test);


Output:
res12: String = {}

To add insult to injury, I could use the built in toJSON function (from scala.util.parsing.json._) but our corporate environment has set spark.sql.jsonGenerator.ignoreNullFields to True and it can't be changed but the output has to include null fields - hoping json4s can oblige :)

Thanks


Solution

  • Not sure what I'm doing wrong?

    That's because spark.sql(...) returns a DataFrame, and all instance variables of DataFrame are private, so your parser will basically just ignore them. You can try this:

    case class PrivateStuff(private val thing: String)
    
    write(PrivateStuff("something"))
    // ourputs {}
    

    So you can't just convert a whole DataFrame to JSON, what you can do instead, is to collect your data (which returns Array[Row] or List[Row]) and try to convert each row into Scala objects, since the result of converting rows to JSON is not probably what you want, and then, use the write function:

    case class YourModel(x1: String, ...)
    object YourModel {
      def fromRow(row: Row): Option[YourModel] = // conversion logic here
    }
    
    val myData: Array[YourModel] = spark.sql("SELECT ...")
      .collect()
      .map(YourModel.fromRow)
      .collect { case Some(value) => value }
    
    write(myData)
    

    Update


    After your explanations about the size of the rows, it doesn't make sense to create case classes, you can use the json method of the Row class in order to achieve that (and it doesn't care about spark.sql.jsonGenerator.ignoreNullFields):

    val test = spark.sql("SELECT field1, field2, field3 FROM myTable LIMIT 2")
    
    val jsonDF = test.map(_.json)
    

    This is a dataframe of JSON objects, you can collect them, save them to files, show them, basically anything you can do with a dataframe.