pythondataframepyspark

Edit the column that is nested into a array that is nested into a struct


How can I edit the I column of my DataFrame by applying my example_loop function on it?

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructField, StructType


@udf(returnType=IntegerType())
def example(i, a):
    if a == "querty":
       i += 5
    else:
       i += 10
    return i


@udf(returnType=ArrayType(IntegerType()))
def example_loop(i_elements, a):
    return [example(i, a) for i in i_elements]



def main():
    spark = SparkSession.builder.getOrCreate()

    dataDF = [
        ('qwerty', 'ytrewq', ('Jon', 'Smith', [('huhu', 'haha', 14)], 20))
    ]


    schema = StructType([
    StructField("A", StringType()),
    StructField("B", StringType()),
    StructField("C",
        StructType([
            StructField("D", StringType()),
            StructField("E", StringType()),
            StructField("F",
                ArrayType(
                StructType([
                    StructField("G", StringType()),
                    StructField("H", StringType()),
                    StructField("I", IntegerType())
                ])
                )
            ),
            StructField("K", IntegerType()),
        ])
    )
    ])


    df: DataFrame = spark.createDataFrame(data=dataDF, schema=schema)

    df.show(truncate=False)
    df = df.withColumn("C", col("C").withField("K", example(col("C.K"), col("A"))))
    df.show(truncate=False)
    df = df.withColumn("C", col("C").withField("F", example_loop(col("C.F.I"), col("A"))))
    df.show(truncate=False)

if __name__ == "__main__":
    main()

Solution

  • In order to fix your code, first modify the udf example_loop to take input as array of structs/rows and it should return the transformed array of structs after mutating the I values. Also modify your example func to be simple python function instead of UDF since you can not call udf inside another udf normally.

    Code

    # Define a schema for the return type of example_loop
    f_schema = ArrayType(
        StructType([
            StructField("G", StringType()),
            StructField("H", StringType()),
            StructField("I", IntegerType())
        ]
        )
    )
    
    # this should not be UDF
    def example(i, a):
        return i + (5 if a == "querty" else 10)
    
    # See the changes in return type as well as the logic
    @udf(returnType=f_schema)
    def example_loop(rows: list[T.Row], a):
        return [{**r.asDict(True), 'I': example(r.I, a)} for r in rows]
    
    df = df.withColumn("C", col("C").withField("F", example_loop(col("C.F"), col('A'))))
    df.show(truncate=False)
    

    Result

    +------+------+--------------------------------+
    |A     |B     |C                               |
    +------+------+--------------------------------+
    |qwerty|ytrewq|{Jon, Smith, [{huhu, haha, 24}]}|
    +------+------+--------------------------------+