scalaapache-sparkmultidimensional-arrayapache-spark-sql

Flatten nested json in Scala Spark Dataframe


I have multiple jsons coming from any restapi's and I don't know the schema of it. I am unable to use the explode function of dataframes , because i am unaware about the column names, which is getting created by spark api.

1.Can we store the keys of the nested arrays elements keys by decoding values from dataframe.schema.fields, As spark only provides the value part in the rows of the dataframe and take the top level key as column name.

Dataframe --

+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+

Is there any optimal way to flatten the json by using the dataframe methods via determining the schema at the run time.

Sample Json -:

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

Note - We need to do all the operations in dataframe , because there is a huge amount of data , that is coming and we cannot parse each and every json.


Solution

  • Try to avoid flattening all columns as much as possible.

    Created helper function & You can directly call df.explodeColumns on DataFrame.

    Below code will flatten multi level array & struct type columns.

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import scala.annotation.tailrec
    import scala.util.Try
    
    implicit class DFHelpers(df: DataFrame) {
        def columns = {
          val dfColumns = df.columns.map(_.toLowerCase)
          df.schema.fields.flatMap { data =>
            data match {
              case column if column.dataType.isInstanceOf[StructType] => {
                column.dataType.asInstanceOf[StructType].fields.map { field =>
                  val columnName = column.name
                  val fieldName = field.name
                  col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
                }.toList
              }
              case column => List(col(s"${column.name}"))
            }
          }
        }
    
        def flatten: DataFrame = {
          val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
          empty match {
            case false =>
              df.select(columns: _*).flatten
            case _ => df
          }
        }
        def explodeColumns = {
          @tailrec
          def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
            case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
              dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
            }))
            case _ => cdf
          }
          columns(df.flatten)
        }
    }
    
    // Exiting paste mode, now interpreting.
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import scala.annotation.tailrec
    import scala.util.Try
    defined class DFHelpers
    
    

    Flattened Columns

    scala> df.printSchema
    root
     |-- stackoverflow: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- tag: struct (nullable = true)
     |    |    |    |-- author: string (nullable = true)
     |    |    |    |-- frameworks: array (nullable = true)
     |    |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |    |-- id: long (nullable = true)
     |    |    |    |    |    |-- name: string (nullable = true)
     |    |    |    |-- id: long (nullable = true)
     |    |    |    |-- name: string (nullable = true)
    
    
    scala> df.explodeColumns.printSchema
    root
     |-- author: string (nullable = true)
     |-- frameworks_id: long (nullable = true)
     |-- frameworks_name: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
    
    scala>