dataframeapache-sparkpysparkuser-defined-functions

pyspark - explode a dataframe col, which contains json


I have a column 'user_contacts_attributes ' in spark dataframe:

+------------------+-------------------------------------+

|    user_name       |user_contacts_attributes           |

+------------------+-------------------------------------+

|    Test          |  "user": { "id": "16","username":"sam","level":"2.00"}                       |             
+------------------+-------------------------------------+

the dataframe has a schema as below:

user_name:string
 
user_contacts_attributes:struct
user:struct
id:string
level:string
username:string

The resultant dataframe has to be as below:

+------------------++------------------+---------------------------+
|    user_name     |parent             |child   | value        |
+------------------+---------------------------++------------------+
|              Test|user               |  id     | 16              |
|              Test|user               |level    | 2.00|
|              Test|user               |username | sam |
+------------------+---------------------------+-------------------+

I have tried writing UDF similar to this PySpark "explode" dict in column But failed. I need a udf that I can apply to each of these dataframe rows


Solution

  • Use an SQL expression to create a new column containing an array of named_structs, where each struct contains the field name and field value of one json element:

    from pyspark.sql import functions as F
    
    df = ....
    
    base = 'user_contacts_attributes.user' # according the the schema 'user' is a fixed string
    # get all json elements:
    fields = df.select(f'{base}.*').schema.fieldNames() 
    # create the expression for the array
    expr = "array(" + \
      ",".join([f'named_struct("child", "{field}", "value", {base}.{field})' for field in fields]) \
     + ")"
     
    

    expr now contains the string

    array(
      named_struct("child", "id", "value", user_contacts_attributes.user.id),
      named_struct("child", "level", "value", user_contacts_attributes.user.level),
      named_struct("child", "username", "value", user_contacts_attributes.user.username)
    )
    

    Finally explode the array and remove the intermediate columns:

    df.withColumn('parent', F.lit('user'))\
      .withColumn('values', F.expr(expr)) \
      .withColumn('values', F.explode("values")) \
      .withColumn("child", F.col("values.child")) \
      .withColumn("value", F.col("values.value")) \
      .drop("user_contacts_attributes", "values") \
      .show(truncate=False)
    

    Output:

    +---------+------+--------+-----+
    |user_name|parent|child   |value|
    +---------+------+--------+-----+
    |Test     |user  |id      |16   |
    |Test     |user  |level   |2.00 |
    |Test     |user  |username|sam  |
    +---------+------+--------+-----+
    

    If possible, using UDFs should be avoided as they are slower than SQL functions.