pysparkaws-glueaws-glue-spark

Adding column to dataFrame


I need to add new column to DataFrame (DynamicFrame) based on json data from other column, what's the best way to do it?

schema:

'id' 'name' 'customJson'
--------------------------
1 ,John, {'key':'lastName','value':'Smith'}

after:

'id' 'name' 'lastName' 'customJson'
-----------------------------------
1, John, Smith, {'key':'lastName','value':'Smith'}

Tried with withColumn but not sure how to fetch and calc data based on json value


Solution

  • DynamicFrames won't let you do this level of transformation, so you will need to cast it to PySpark DataFrame using .toDF() method then, after the transformation, .fromDF().

    Here is an example on how to parse it with PySpark DataFrame:

    Creating a DataFrame as example

    from pyspark.sql import DataFrame , SparkSession
    spark = spark = SparkSession.builder \
        .master("local") \
        .appName("Parsing JSON") \
        .getOrCreate()
    
    df = spark.createDataFrame([(1 ,"John", "{'key':'lastName','value':'Smith'}")],['id','name','customJson'])
    
    

    Now Parsing the JSON Column

    from pyspark.sql.types import StructType, StructField, StringType
    from pyspark.sql.functions import from_json
    schema = StructType([StructField('key', StringType()),StructField('value', StringType())])
     
    df = df.select(
        df.id,
        df.name,
        from_json(df.customJson, schema).value.alias('lastName'),
        df.customJson
    )
    

    Feel free to run this notebook if you like.

    Here is some documentation: https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.from_json.html

    [EDIT]

    If your Json represents an array of elements (like in the comment below), you would need to include the ArrayType on schema, then use explode function to flatten out the values you need.

    Creating DataFrame as example

    
    from pyspark.sql import DataFrame , SparkSession
    spark = spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .getOrCreate()
    
    df = spark.createDataFrame([(1 ,"John", "[{'key':'lastName','value':'Smith'},{'key':'lastName','value':'Silva'}]")],['id','name','customJson'])
    
    

    Parsing Json representing array of elements

    from pyspark.sql.types import StructType, StructField, StringType, ArrayType
    from pyspark.sql.functions import from_json, explode
    schema = ArrayType(StructType([StructField('key', StringType()),StructField('value', StringType())]))
     
    df = df.select(
        df.id,
        df.name,
        explode(
            from_json(df.customJson, schema)
        ).alias('parsedJson'),
        df.customJson
    )
    
    df.select(
        df.id,
        df.name,
        df.parsedJson.value.alias("lastName"),
    ).toPandas().to_markdown() 
    
    # to_markdown properly formats the DataFrame as markdown to print below as table
    
    id name lastName
    0 1 John Smith
    1 1 John Silva