pythonpysparkdatabricks

How can I use a PySpark UDF in a for loop?


I need a PySpark UDF with a for loop to create new columns but with conditions based on the iterator value.

def test_map(col):
    if x == 1:
        if col < 0.55:
            return 1.2
        else:
            return 0.99
    elif x == 2:
        if col < 0.87:
            return 1.5
        else:
            return 2.4
    etc.

test_map_udf = F.udf(test_map, IntegerType())

And then iterate:

for x in range(1, 10):
    df = df.withColumn(f"new_value_{x}", test_map_udf(F.col(f"old_value_{x}"))

But it errors out because test_map doesn't know what x is when it runs, and you can't pass x to test_map_udf.

Should I create a regular Python function that takes x, and that function calls the UDF?


Solution

  • You can pass x as arguments to udf.

    def test_map(x, col):
        ...
    
    test_map_udf = F.udf(test_map, IntegerType())
    
    
    for x in range(1, 10):
        df = df.withColumn(f"new_value_{x}", test_map_udf(F.lit(x), F.col(f"old_value_{x}"))