How can I edit the I
column of my DataFrame by applying my example_loop
function on it?
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructField, StructType
@udf(returnType=IntegerType())
def example(i, a):
if a == "querty":
i += 5
else:
i += 10
return i
@udf(returnType=ArrayType(IntegerType()))
def example_loop(i_elements, a):
return [example(i, a) for i in i_elements]
def main():
spark = SparkSession.builder.getOrCreate()
dataDF = [
('qwerty', 'ytrewq', ('Jon', 'Smith', [('huhu', 'haha', 14)], 20))
]
schema = StructType([
StructField("A", StringType()),
StructField("B", StringType()),
StructField("C",
StructType([
StructField("D", StringType()),
StructField("E", StringType()),
StructField("F",
ArrayType(
StructType([
StructField("G", StringType()),
StructField("H", StringType()),
StructField("I", IntegerType())
])
)
),
StructField("K", IntegerType()),
])
)
])
df: DataFrame = spark.createDataFrame(data=dataDF, schema=schema)
df.show(truncate=False)
df = df.withColumn("C", col("C").withField("K", example(col("C.K"), col("A"))))
df.show(truncate=False)
df = df.withColumn("C", col("C").withField("F", example_loop(col("C.F.I"), col("A"))))
df.show(truncate=False)
if __name__ == "__main__":
main()
In order to fix your code, first modify the udf example_loop
to take input as array of structs/rows and it should return the transformed array of structs after mutating the I
values. Also modify your example
func to be simple python function instead of UDF since you can not call udf inside another udf normally.
# Define a schema for the return type of example_loop
f_schema = ArrayType(
StructType([
StructField("G", StringType()),
StructField("H", StringType()),
StructField("I", IntegerType())
]
)
)
# this should not be UDF
def example(i, a):
return i + (5 if a == "querty" else 10)
# See the changes in return type as well as the logic
@udf(returnType=f_schema)
def example_loop(rows: list[T.Row], a):
return [{**r.asDict(True), 'I': example(r.I, a)} for r in rows]
df = df.withColumn("C", col("C").withField("F", example_loop(col("C.F"), col('A'))))
df.show(truncate=False)
+------+------+--------------------------------+
|A |B |C |
+------+------+--------------------------------+
|qwerty|ytrewq|{Jon, Smith, [{huhu, haha, 24}]}|
+------+------+--------------------------------+