I need to add new column to DataFrame (DynamicFrame) based on json data from other column, what's the best way to do it?
schema:
'id' 'name' 'customJson'
--------------------------
1 ,John, {'key':'lastName','value':'Smith'}
after:
'id' 'name' 'lastName' 'customJson'
-----------------------------------
1, John, Smith, {'key':'lastName','value':'Smith'}
Tried with withColumn but not sure how to fetch and calc data based on json value
DynamicFrames won't let you do this level of transformation, so you will need to cast it to PySpark DataFrame using .toDF()
method then, after the transformation, .fromDF()
.
Here is an example on how to parse it with PySpark DataFrame:
from pyspark.sql import DataFrame , SparkSession
spark = spark = SparkSession.builder \
.master("local") \
.appName("Parsing JSON") \
.getOrCreate()
df = spark.createDataFrame([(1 ,"John", "{'key':'lastName','value':'Smith'}")],['id','name','customJson'])
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import from_json
schema = StructType([StructField('key', StringType()),StructField('value', StringType())])
df = df.select(
df.id,
df.name,
from_json(df.customJson, schema).value.alias('lastName'),
df.customJson
)
Feel free to run this notebook if you like.
Here is some documentation: https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.from_json.html
If your Json represents an array of elements (like in the comment below), you would need to include the ArrayType
on schema, then use explode
function to flatten out the values you need.
from pyspark.sql import DataFrame , SparkSession
spark = spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.getOrCreate()
df = spark.createDataFrame([(1 ,"John", "[{'key':'lastName','value':'Smith'},{'key':'lastName','value':'Silva'}]")],['id','name','customJson'])
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import from_json, explode
schema = ArrayType(StructType([StructField('key', StringType()),StructField('value', StringType())]))
df = df.select(
df.id,
df.name,
explode(
from_json(df.customJson, schema)
).alias('parsedJson'),
df.customJson
)
df.select(
df.id,
df.name,
df.parsedJson.value.alias("lastName"),
).toPandas().to_markdown()
# to_markdown properly formats the DataFrame as markdown to print below as table
id | name | lastName | |
---|---|---|---|
0 | 1 | John | Smith |
1 | 1 | John | Silva |