pythonapache-sparkpysparkdatabricks

PySpark UDF mapping is returning empty columns


Given a dataframe, I want to apply a mapping with UDF but getting empty columns.

data = [(1, 3),
  (2, 3),
  (3, 5),
  (4, 10),
  (5, 20)]

df = spark.createDataFrame(data, ["int_1", "int_2"])
df.show()
+-----+-----+
|int_1|int_2|
+-----+-----+
|    1|    3|
|    2|    3|
|    3|    5|
|    4|   10|
|    5|   20|
+-----+-----+

I have a mapping:

def test_map(col):
    if col < 5:
        score = 'low'
    else:
        score = 'high'
    return score


mapp = {}
test_udf = F.udf(test_map, IntegerType())

I iterate here to populate mapp...

for x in (1, 2):
    print(f'Now working {x}')
    mapp[f'limit_{x}'] = test_udf(F.col(f'int_{x}'))

print(mapp)
{'limit_1': Column<'test_map(int_1)'>, 'limit_2': Column<'test_map(int_2)'>}

df.withColumns(mapp).show()
+-----+-----+-------+-------+
|int_1|int_2|limit_1|limit_2|
+-----+-----+-------+-------+
|    1|    3|   NULL|   NULL|
|    2|    3|   NULL|   NULL|
|    3|    5|   NULL|   NULL|
|    4|   10|   NULL|   NULL|
|    5|   20|   NULL|   NULL|
+-----+-----+-------+-------+

The problem is I get null columns. What I'm expecting is:

+-----+-----+-------+-------+
|int_1|int_2|limit_1|limit_2|
+-----+-----+-------+-------+
|    1|    3|   low |   low |
|    2|    3|   low |   low |
|    3|    5|   low |   low |
|    4|   10|   low |   high|
|    5|   20|   low |   high|
+-----+-----+-------+-------+

The reason I'm doing it is because I have to do for 100 columns. I heard that "withColumns" with a mapping is much faster than iterating over "withColumn" many times.


Solution

  • Your problem is that your UDF is registered to return an integer (defined to return an IntegerType()) while your Python function intends to return a string ("low" or "high"), so what you need to do is to set StringType() in your UDF return type:

    test_udf = F.udf(test_map, StringType())
    

    Let me know if you want more explanation about UDFs!