pythonjsonpyspark

PySpark join fields in JSON to a dataframe


I am trying to pull out some fields from a JSONn string into a dataframe. I can achieve this by put each field in a dataframe then join all the dataframes like below. But is there some easier way to do this? Because this is just an simplified example and I have a lot more fields to extract in my project.

    from pyspark.sql import Row
    s = '{"job_id":"123","settings":{"task":[{"taskname":"task1"},{"taskname":"task2"}]}}'

    json_object = json.loads(s)

    # json_object

    job_id_l = [Row(job_id=json_object['job_id'])] 
    job_id_df = spark.createDataFrame(job_id_l) 
    # display(job_id_df)


    tasknames = []
    for t in json_object['settings']["task"]:
      tasknames.append(Row(taskname=t["taskname"]))

    tasknames_df = spark.createDataFrame(tasknames) 
    # display(tasknames_df)

    job_id_df.crossJoin(tasknames_df).display()

Result:

    job_id  taskname
    123 task1
    123 task2

Solution

  • Update1:

    You don't even have to explicitly define the schema here and instead, you may simply use schema_of_json as follows:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, explode, schema_of_json, lit
    
    spark = SparkSession.builder.getOrCreate()
    
    s = '{"job_id":"123","settings":{"task":[{"taskname":"task1"},{"taskname":"task2"}]}}'
    
    schema = schema_of_json(lit(s))
    
    result_df = (
        spark.createDataFrame([s], "string")
        .select(from_json(col("value"), schema).alias("data"))
        .select("data.job_id", explode("data.settings.task.taskname").alias("taskname"))
    )
    
    result_df.show()
    
    # +------+--------+
    # |job_id|taskname|
    # +------+--------+
    # |   123|   task1|
    # |   123|   task2|
    # +------+--------+
    

    As you mentioned there are a lot more fields - this will take away some work from your hands.


    OriginalAnswer:

    An easier way is to mirroring the schema with JSON string s and using from_json as follows:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import from_json, col, explode
    
    spark = SparkSession.builder.getOrCreate()
    
    s = '{"job_id":"123","settings":{"task":[{"taskname":"task1"},{"taskname":"task2"}]}}'
    
    schema = "struct<job_id:string, settings:struct<task:array<struct<taskname:string>>>>"
    
    result_df = (
        spark.createDataFrame([s], "string")
        .select(from_json(col("value"), schema).alias("data"))
        .select("data.job_id", explode("data.settings.task.taskname").alias("taskname"))
    )
    
    result_df.show()
    
    # +------+--------+
    # |job_id|taskname|
    # +------+--------+
    # |   123|   task1|
    # |   123|   task2|
    # +------+--------+