When I use DataFrame.replace, it doesn't replace the values that are in a structure.
In this example, it doesn't replace the value of my_struct.struct_string
:
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, StructType, StructField
glueContext = GlueContext(SparkContext.getOrCreate())
data = [
("null", {"struct_string": "null"}),
]
schema = StructType([
StructField("a_string", StringType(), True),
StructField(
"my_struct",
StructType([
StructField("struct_string", StringType(), True),
]),
True
)
])
df = spark.createDataFrame(data, schema)
df = df.replace("null", None)
df_astring = df.filter(col("a_string").isNotNull())
df_struct_string = df.filter(col("my_struct.struct_string").isNotNull())
print("My df_astring")
df_astring.show()
print("My df_struct_string")
df_struct_string.show()
It print:
My df_astring
+--------+---------+
|a_string|my_struct|
+--------+---------+
+--------+---------+
My df_struct_string
+--------+---------+
|a_string|my_struct|
+--------+---------+
| null| {null}|
+--------+---------+
Note that I also tried df = df.replace("null", None, ["a_string", "my_struct.struct_string"])
, but I get the exception java.lang.UnsupportedOperationException: Nested field my_struct.struct_string is not supported
.
The solution needs to be dynamic, so it won't precise manually the column name that are string.
The expected output is:
My df_astring
+--------+---------+
|a_string|my_struct|
+--------+---------+
+--------+---------+
My df_struct_string
+--------+---------+
|a_string|my_struct|
+--------+---------+
+--------+---------+
The answer of @samhita works, but it is way slower when the dataset is big.
I also included the support of list.
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, StructType, StructField
from typing import Any
spark = SparkSession.builder.appName("replace_null_example").getOrCreate()
# Sample DataFrame
data = [
("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
]
# Define the schema with three levels of nested StructType fields
schema = StructType([
StructField("a_string", StringType(), True),
StructField(
"my_struct", # First level struct
StructType([
StructField(
"struct_string", # Second level struct
StringType(),
True
),
StructField(
"nested_struct", # Third level struct
StructType([
StructField("nested_field", StringType(), True)
]),
True
)
]),
True
)
])
df = spark.createDataFrame(data, schema)
def remplace_null_values(line: Row) -> dict[str, Any]:
line_dict = line.asDict(True)
def transformation(data):
if isinstance(data, dict):
return {key: transformation(value) for key, value in data.items()}
elif isinstance(data, list):
return [transformation(item) for item in data]
elif data == "null":
return None
else:
return data
line_dict = transformation(line_dict)
return line_dict
new_df: DataFrame = df.rdd.map(remplace_null_values).toDF(df.schema)
df_astring = new_df.filter(col("a_string").isNotNull())
df_struct_string = new_df.filter(col("my_struct.nested_struct.nested_field").isNotNull())
print("My df_astring")
df_astring.show(truncate=False)
print("My df_struct_string")
df_struct_string.show(truncate=False)