I want to create a new column that contains an array of values for the column names listed in the lookup
column.
input_df = spark.createDataFrame([
Row(id=123, alert=1, operation=1, lookup=[]),
Row(id=234, alert=0, operation=0, lookup=['alert']),
Row(id=345, alert=1, operation=0, lookup=['operation']),
Row(id=456, alert=0, operation=1, lookup=['alert', 'operation']),
])
id | alert | operation | lookup | lookup_values |
---|---|---|---|---|
123 | 1 | 1 | [] |
[] |
234 | 0 | 0 | [alert] |
[0] |
345 | 1 | 0 | [operation] |
[0] |
456 | 0 | 1 | [alert, operation] |
[0, 1] |
input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df[f'{x}'])).show()
Fails with the error:
AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with the name
Column<'x_1'>
cannot be resolved. Did you mean one of the following? [id
,alert
,operation
,lookup
].
This error is surprising because the following code does not produce an error, although it does not yield the intended result:
input_df.withColumn("lookup_values", F.transform(F.col("lookup"), lambda x: input_df['alert'])).show()
id | alert | operation | lookup | lookup_values |
---|---|---|---|---|
123 | 1 | 1 | [] |
[] |
234 | 0 | 0 | [alert] |
[0] |
345 | 1 | 0 | [operation] |
[1] |
456 | 0 | 1 | [alert, operation] |
[0, 0] |
Here is an answer without UDF using builtin functions. It should be faster with big volumn of data :
from pyspark.sql import functions as F
input_df.withColumn(
"lookup_values",
F.create_map(
[F.lit("alert"), F.col("alert"), F.lit("operation"), F.col("operation")]
),
).withColumn(
"lookup_values",
F.transform(F.col("lookup"), lambda x: F.col("lookup_values")[x])
).display()
+---+-----+---------+------------------+-------------+
| id|alert|operation| lookup|lookup_values|
+---+-----+---------+------------------+-------------+
|123| 1| 1| []| []|
|234| 0| 0| [alert]| [0]|
|345| 1| 0| [operation]| [0]|
|456| 0| 1|[alert, operation]| [0, 1]|
+---+-----+---------+------------------+-------------+