pythonapache-sparkpysparkapache-spark-xml

Spark: How to transform to Data Frame data from multiple nested XML files with attributes


How to transform values below from multiple XML files to spark data frame :

Required output:

+----------------+-------------+---------+
|Id0             |Date         |Value    |
+----------------+-------------+---------+
|Id0_value_file_1|  2021-01-01 |   4_1   |
|Id0_value_file_1|  2021-01-02 |   4_2   |
|Id0_value_file_2|  2021-01-01 |   4_1   |
|Id0_value_file_2|  2021-01-02 |   4_2   |
+----------------+-------+---------------+

file_1.xml:

<Level_0 Id0="Id0_value_file1">
  <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
    <Level_2_A>A</Level_2_A>
    <Level_2>
      <Level_3>
        <Level_4>
          <Date>2021-01-01</Date>
          <Value>4_1</Value>
        </Level_4>
        <Level_4>
          <Date>2021-01-02</Date>
          <Value>4_2</Value>
        </Level_4>
      </Level_3>
    </Level_2>
  </Level_1>
</Level_0>

file_2.xml:

<Level_0 Id0="Id0_value_file2">
  <Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
    <Level_2_A>A</Level_2_A>
    <Level_2>
      <Level_3>
        <Level_4>
          <Date>2021-01-01</Date>
          <Value>4_1</Value>
        </Level_4>
        <Level_4>
          <Date>2021-01-02</Date>
          <Value>4_2</Value>
        </Level_4>
      </Level_3>
    </Level_2>
  </Level_1>
</Level_0>

Current Code Example:

files_list = ["file_1.xml", "file_2.xml"]
df = (spark.read.format('xml')
           .options(rowTag="Level_4")
           .load(','.join(files_list))

Current Output:(Id0 column with attributes missing)

+-------------+---------+
|Date         |Value    |
+-------------+---------+
|  2021-01-01 |     4_1 |
|  2021-01-02 |     4_2 |
|  2021-01-01 |     4_1 |
|  2021-01-02 |     4_2 |
+-------+---------------+

There are some examples, but non of them solve the problem: -I'm using databricks spark_xml - https://github.com/databricks/spark-xml -There is an examample but not with attribute reading, Read XML in spark, Extracting tag attributes from xml using sparkxml .

EDIT: As @mck pointed out correctly <Level_2>A</Level_2> is not correct XML format. I had a mistake in my example(now xml file is corrected), it should be <Level_2_A>A</Level_2_A>. After that , proposed solution works even on multiple files.

NOTE: To speedup loading of large number of xmls define schema, if no schema is defined spark is reading each file when creating dataframe to interfere schema... for more info: https://szczeles.github.io/Reading-JSON-CSV-and-XML-files-efficiently-in-Apache-Spark/

STEP 1):

 files_list = ["file_1.xml", "file_2.xml"]
 # for schema seem NOTE above

 df = (spark.read.format('xml')
               .options(rowTag="Level_0")
               .load(','.join(files_list),schema=schema))
df.printSchema()

root
 |-- Level_1: struct (nullable = true)
 |    |-- Level_2: struct (nullable = true)
 |    |    |-- Level_3: struct (nullable = true)
 |    |    |    |-- Level_4: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- Date: string (nullable = true)
 |    |    |    |    |    |-- Value: string (nullable = true)
 |    |-- Level_2_A: string (nullable = true)
 |    |-- _Id1_1: string (nullable = true)
 |    |-- _Id_2: string (nullable = true)
 |-- _Id0: string (nullable = true

STEP 2) see below @mck solution:


Solution

  • You can use Level_0 as the rowTag, and explode the relevant arrays/structs:

    import pyspark.sql.functions as F
    
    df = spark.read.format('xml').options(rowTag="Level_0").load('line_removed.xml')
    
    df2 = df.select(
        '_Id0', 
        F.explode_outer('Level_1.Level_2.Level_3.Level_4').alias('Level_4')
    ).select(
        '_Id0',
        'Level_4.*'
    )
    
    df2.show()
    +---------------+----------+-----+
    |           _Id0|      Date|Value|
    +---------------+----------+-----+
    |Id0_value_file1|2021-01-01|  4_1|
    |Id0_value_file1|2021-01-02|  4_2|
    +---------------+----------+-----+