jsonscalaapache-sparkdelta-lake

Retrieve stored JSON string as a column of Delta table using Scala and Apache Spark


Problem To Solve

Need to extract json string from delta table and parse it eventually. show function can be used to see the data but it needs to be extracted into map or case class for processing.

The data is inserted from json file to delta table. The column in the table is of type String. In this process only data is stored in the delta table column and not the key values from json array. So select query returns partial data so to say. Is this default behavior of delta table ?

e.g.

The data to store { "name" : "Sample"} What actually gets stored : { "Sample"}

Background

Already referred to following SO questions and followed the instructions there but could not find the solution

Question 1

Question 2

Data was inserted into delta table from a json file

One of the field is json array and it is supposed to be inserted as is

    val jsonData = spark.read
      .option("multiLine", true)
      .option("mode",
        "PERMISSIVE"
      ).option(
        "dropFieldIfAllNull",
        false
      ).json(FILE_PATH)
      .createOrReplaceTempView("data")

    val data = spark.sql("""SELECT * from Data""")

    val inputDataFrame = data
      .select(
        col("Id"),
        col("Version"),
        col("StartDate"),
        col("EndDate"),
        explode(col("Configuration")).alias(
          "Config"
        ) 
      )
      .withColumn("ConfigId", col("Config.Id"))
      .
      .
      .
      .toDF
      
      val deltaTable = DeltaTable.forPath(.....)

      deltaTable
      .as("Config") 
      .merge(
             inputDataFrame.as("input"),
  
      )
      .whenMatched
      .update(
        Map(
            "ConfigData" -> col("input.Config"),
        )
      )
      .whenNotMatched
      .insert(
        Map(
          "ConfigData" -> col("input.Config"),
        )
      )
      .execute()
  }

After running select query the data shows up without keys and only values from the json string inserted

Method 1

val arrayOffStringSchema = ArrayType(StringType)

var configData = df.select("Config").as[String]
configData.show(false) // shows Config column and data without keys
                                         
var desiredData = configData.withColumn("Config",from_json(col("Config"), arrayofStringSchema))
desiredData.show(false) // shows null

Method 2

val json_schema = spark.read
      .option("multiLine", true)
      .option(
        "mode",
        "PERMISSIVE"
      ) 
      .option(
        "dropFieldIfAllNull",
        false
      ).json(df.select("Config").as[String]).schema
var config = configData.withColumn("Config", from_json(col("Config"),json_schema))
config.show(false) // shows Config column and data without keys
config.printSchema() // Shows following

/*
root
 |-- Config: struct (nullable = true)
 |    |-- _corrupt_record: string (nullable = true)
*/

Platform

Scala 2.12.18

Apache Spark 3.5.1

What is missing here ? Is there problem in inserting or there is some other way to achieve this ?

EDIT 1 Apparently when the data is stored the schema is inferred automatically and it is not stored in the delta table. So while fetching data, no idea where does the schema come from. Basically a Map is needed which will enable to fetch fields from the json string stored as column in delta table.


Solution

  • let me explain the problem in nutshell

    The data from a json file is being read and inserted into Delta Table column.

    The column is of the type String

    It was observed that when the dataframe containing json data is as is inserted into Delta table column, it automatically infers the schema and all the keys from the json structure are dropped and only values are stored

    It made difficult to parse the data as it was read later from the delta table column.

    Solution

    to_json : used to convert dataframe column to json and stored into delta table column

    from_json : used to convert dataframe column to structured json

    val json_schema = spark.read
          .option("multiLine", true)
          .option(
            "mode",
            "PERMISSIVE"
          ) 
          .option(
            "dropFieldIfAllNull",
            false
          ).json(df.select("Config").as[String]).schema
    
    var struct_json = levers.withColumn("StructuredConfig", from_json(col("Config"),json_schema))
    
    struct_json.printSchema // shows desired schema
    

    Now when StructuredConfig.name or StructuredConfig.id can be selected from the dataframe which was not possible previously