Problem To Solve
Need to extract json string from delta table and parse it eventually. show
function can be used to see the data but it needs to be extracted into map or case class for processing.
The data is inserted from json file to delta table. The column in the table is of type String
. In this process only data is stored in the delta table column and not the key values from json array. So select
query returns partial
data so to say. Is this default behavior of delta table ?
e.g.
The data to store { "name" : "Sample"} What actually gets stored : { "Sample"}
Background
Already referred to following SO questions and followed the instructions there but could not find the solution
Data was inserted into delta table from a json file
One of the field is json array and it is supposed to be inserted as is
val jsonData = spark.read
.option("multiLine", true)
.option("mode",
"PERMISSIVE"
).option(
"dropFieldIfAllNull",
false
).json(FILE_PATH)
.createOrReplaceTempView("data")
val data = spark.sql("""SELECT * from Data""")
val inputDataFrame = data
.select(
col("Id"),
col("Version"),
col("StartDate"),
col("EndDate"),
explode(col("Configuration")).alias(
"Config"
)
)
.withColumn("ConfigId", col("Config.Id"))
.
.
.
.toDF
val deltaTable = DeltaTable.forPath(.....)
deltaTable
.as("Config")
.merge(
inputDataFrame.as("input"),
)
.whenMatched
.update(
Map(
"ConfigData" -> col("input.Config"),
)
)
.whenNotMatched
.insert(
Map(
"ConfigData" -> col("input.Config"),
)
)
.execute()
}
After running select
query the data shows up without keys and only values from the json string inserted
Method 1
val arrayOffStringSchema = ArrayType(StringType)
var configData = df.select("Config").as[String]
configData.show(false) // shows Config column and data without keys
var desiredData = configData.withColumn("Config",from_json(col("Config"), arrayofStringSchema))
desiredData.show(false) // shows null
Method 2
val json_schema = spark.read
.option("multiLine", true)
.option(
"mode",
"PERMISSIVE"
)
.option(
"dropFieldIfAllNull",
false
).json(df.select("Config").as[String]).schema
var config = configData.withColumn("Config", from_json(col("Config"),json_schema))
config.show(false) // shows Config column and data without keys
config.printSchema() // Shows following
/*
root
|-- Config: struct (nullable = true)
| |-- _corrupt_record: string (nullable = true)
*/
Platform
Scala 2.12.18
Apache Spark 3.5.1
What is missing here ? Is there problem in inserting or there is some other way to achieve this ?
EDIT 1 Apparently when the data is stored the schema is inferred automatically and it is not stored in the delta table. So while fetching data, no idea where does the schema come from. Basically a Map is needed which will enable to fetch fields from the json string stored as column in delta table.
let me explain the problem in nutshell
The data from a json file is being read and inserted into Delta Table column.
The column is of the type String
It was observed that when the dataframe containing json data is as is inserted into Delta table column, it automatically infers the schema and all the keys from the json structure are dropped and only values are stored
It made difficult to parse the data as it was read later from the delta table column.
Solution
to_json : used to convert dataframe column to json and stored into delta table column
from_json : used to convert dataframe column to structured json
val json_schema = spark.read
.option("multiLine", true)
.option(
"mode",
"PERMISSIVE"
)
.option(
"dropFieldIfAllNull",
false
).json(df.select("Config").as[String]).schema
var struct_json = levers.withColumn("StructuredConfig", from_json(col("Config"),json_schema))
struct_json.printSchema // shows desired schema
Now when StructuredConfig.name
or StructuredConfig.id
can be selected from the dataframe which was not possible previously