pythonpyspark

How to replace a value including the column in a structure


When I use DataFrame.replace, it doesn't replace the values that are in a structure.

In this example, it doesn't replace the value of my_struct.struct_string:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, StructType, StructField

glueContext = GlueContext(SparkContext.getOrCreate())

data = [
    ("null", {"struct_string": "null"}),
]

schema = StructType([
    StructField("a_string", StringType(), True),
    StructField(
        "my_struct",
        StructType([
           StructField("struct_string", StringType(), True),
        ]),
        True
    )
])


df = spark.createDataFrame(data, schema)


df = df.replace("null", None)

df_astring = df.filter(col("a_string").isNotNull())
df_struct_string = df.filter(col("my_struct.struct_string").isNotNull())

print("My df_astring")
df_astring.show()
print("My df_struct_string")
df_struct_string.show()

It print:

My df_astring
+--------+---------+
|a_string|my_struct|
+--------+---------+
+--------+---------+

My df_struct_string
+--------+---------+
|a_string|my_struct|
+--------+---------+
|    null|   {null}|
+--------+---------+

Note that I also tried df = df.replace("null", None, ["a_string", "my_struct.struct_string"]), but I get the exception java.lang.UnsupportedOperationException: Nested field my_struct.struct_string is not supported.

The solution needs to be dynamic, so it won't precise manually the column name that are string.

The expected output is:

My df_astring
+--------+---------+
|a_string|my_struct|
+--------+---------+
+--------+---------+

My df_struct_string
+--------+---------+
|a_string|my_struct|
+--------+---------+
+--------+---------+

Solution

  • The answer of @samhita works, but it is way slower when the dataset is big.

    I also included the support of list.

    from pyspark.sql import DataFrame, Row, SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.types import StringType, StructType, StructField
    from typing import Any
    
    spark = SparkSession.builder.appName("replace_null_example").getOrCreate()
    
    # Sample DataFrame
    data = [
        ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
        ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
        ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
        ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
        ("null", {"struct_string": "null", "nested_struct": {"nested_field": "null"}}),
    ]
    
    # Define the schema with three levels of nested StructType fields
    schema = StructType([
        StructField("a_string", StringType(), True), 
        StructField(
            "my_struct",  # First level struct
            StructType([
                StructField(
                    "struct_string",  # Second level struct
                    StringType(), 
                    True
                ),
                StructField(
                    "nested_struct",  # Third level struct
                    StructType([
                        StructField("nested_field", StringType(), True) 
                    ]),
                    True
                )
            ]),
            True
        )
    ])
    
    df = spark.createDataFrame(data, schema)
    
    def remplace_null_values(line: Row) -> dict[str, Any]:
        line_dict = line.asDict(True)
    
        def transformation(data):
            if isinstance(data, dict):
                return {key: transformation(value) for key, value in data.items()}
            elif isinstance(data, list):
                return [transformation(item) for item in data]
            elif data == "null":
                return None
            else:
                return data
    
        line_dict = transformation(line_dict)
        return line_dict
    
    new_df: DataFrame = df.rdd.map(remplace_null_values).toDF(df.schema)
    
    df_astring = new_df.filter(col("a_string").isNotNull())
    df_struct_string = new_df.filter(col("my_struct.nested_struct.nested_field").isNotNull())
    
    print("My df_astring")
    df_astring.show(truncate=False)
    print("My df_struct_string")
    df_struct_string.show(truncate=False)