Below is my sample schema.
|-- provider: string (nullable = true)
|-- product: string (nullable = true)
|-- asset_name: string (nullable = true)
|-- description: string (nullable = true)
|-- creation_date: string (nullable = true)
|-- provider_id: string (nullable = true)
|-- asset: string (nullable = true)
|-- asset_clas: string (nullable = true)
|-- Actors: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Actors_Display: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Audio_Type: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Billing_ID: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Bit_Rate: array (nullable = true)
| |-- element: string (containsNull = false)
|-- CA_Rating: array (nullable = true)
| |-- element: string (containsNull = false)
I need to explode all the array type columns.I have around 80+ columns and the columns keeps changing. I am currently using explode(array_zip)
val df= sourcedf.select($"provider",$"asset_name",$"description",$"creation_date",$"provider_id",$"asset_id",$"asset_class",$"product",$"provider_id",$"eligible_platform",$"actors",$"category",
explode_outer(arrays_zip($"Actors_Display",$"Audio_Type",$"Billing_ID",$"Bit_Rate",$"CA_Rating")
val parsed_output = df.select(col("provider"),("asset_name"),col("description"),col("creation_date"),col("product"),col("provider"),
col("povider_id"),col("asset_id"),col("asset_class"),
col("col.Actors_Display"),col("col.Audio_Type"),col("col.Billing_ID"),col("col.Bit_Rate"),col("col.CA_Rating"))
By using, the above I am able to get the output. But this is working for only one particular file. In my case there will be new columns being added frequently. So, is there any function that could do the explode of multiple columns for changing schema and also select the non array columns from the file. Could someone please give an example
Note: Only the array columns keeps changing, the rest will be constant.
Below is the sample data
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ADIL>
<Meta>
<AMS Asset_Name="asd" Provider="Level" Product="MOTD" Version_Major="1" Version_Minor="0" Description="ZXC" Creation_Date="2009-12-03" Provider_ID="qwer.com" Asset_ID="A12we" Asset_Class="package"/>
<App_Data App="MOD" Name="Actors" Value="CableLa1.1"/>
<App_Data App="MOD" Name="Actors_Display" Value="RTY"/>
<App_Data App="MOD" Name="Audio_Type" Value="FGH"/>
</Meta>
<Asset>
<Meta>
<AMS Asset_Name="bnm" Provider="Level Film" Product="MOTD" Version_Major="1" Version_Minor="0" Description="bnj7" Creation_Date="2009-12-03" Provider_ID="levelfilm.com" Asset_ID="DDDB0610072533182333" Asset_Class="title"/>
App_Data App="rt" Name="Billing_ID" Value="2020-12-29T00:00:00"/>
<App_Data App="MOD" Name="Bit_Rate" Value="2021-12-29T23:59:59"/>
<App_Data App="MOD" Name="CA_Rating" Value="16.99"/>
</Meta>
<Asset>
<Meta>
<AMS Asset_Name="atysd" Provider="Level1" Product="MOTD2" Version_Major="1" Version_Minor="0" Description="ZXCY" Creation_Date="2009-12-03" Provider_ID="qweDFtrr.com" Asset_ID="A12FGwe" Asset_Class="review"/>
This is the xml data. Initially, this data is parsed and all the name attribute values are converted into column names and all the "value" attribute values are converted to the value of the column names. This XML is having repeating tags,so the final result after parsing results in array columns and I used collect_list at the end of the parsing logic.
This is the sample output after parsing.
+-------------------+-------------------+-----------------+------------+--------------+
|Actors |Actors_Display |Audio_Type |Billing_ID |Bit_rate
+-------------+---------------+-----------------------------------------+------------
|["movie","cinema",] | ["Dolby 5.1"] | ["High", "low"] | ["GAR15"]| ["15","14"]
+-------------+-----+-------------------+-----------------+--------------+----------
Assuming you want to explode all ArrayType columns (otherwise, filter accordingly):
val df = Seq(
(1, "xx", Seq(10, 20), Seq("a", "b"), Seq("p", "q")),
(2, "yy", Seq(30, 40), Seq("c", "d"), Seq("r", "s"))
).toDF("c1", "c2", "a1", "a2", "a3")
import org.apache.spark.sql.types.{StructField, ArrayType}
val arrCols = df.schema.fields
.collect{case StructField(name, _: ArrayType, _, _) => name}
.map(col)
val otherCols = df.columns.map(col) diff arrCols
df.withColumn("arr_zip", explode_outer(arrays_zip(arrCols: _*)))
.select(otherCols.toList ::: $"arr_zip.*" :: Nil: _*)
.show
+---+---+---+---+---+
| c1| c2| a1| a2| a3|
+---+---+---+---+---+
| 1| xx| 10| a| p|
| 1| xx| 20| b| q|
| 2| yy| 30| c| r|
| 2| yy| 40| d| s|
+---+---+---+---+---+