I'm struggling with this problem: There is a dataframe with couple of columns. One of this columns is a "BKColumns" column of ARRAY type which contains some of columns' names (not all) of the same dataframe. In every row of the dataframe we can have different values of column names in "BKColumns".At the very end, I would like to add another column to the dataframe which contains concatenation of values from columns mentioned in "BKColumns".
I will describe it also here on the simple example:
Unfortunately none of the below commented lines were working. It throws the error that the column is not iterable or some other errors.
Since this is a dynamic problem it can't be solved by Spark's own tools. It means that python udfs should be used.
from pyspark import functions as F
df = spark_session.createDataFrame([
(1, 2, 3, ['c1', 'c2', 'c3']),
(3, 4, 5, ['c1', 'c2']),
(6, 7, 8, ['c2']),
(9, 10, 11, ['c1', 'c3'])
], ['c1', 'c2', 'c3', 'BKColumns'])
df.show()
# +---+---+---+------------+
# | c1| c2| c3| BKColumns|
# +---+---+---+------------+
# | 1| 2| 3|[c1, c2, c3]|
# | 3| 4| 5| [c1, c2]|
# | 6| 7| 8| [c2]|
# | 9| 10| 11| [c1, c3]|
# +---+---+---+------------+
@F.udf
def udf(c1, c2, c3, bk_columns):
d = {
'c1': c1,
'c2': c2,
'c3': c3
}
return '_'.join(str(d[x]) for x in bk_columns if x in d)
df = df.withColumn('result', udf(F.col('c1'), F.col('c2'), F.col('c3'), F.col('BKColumns')))
df.show()
# +---+---+---+------------+------+
# | c1| c2| c3| BKColumns|result|
# +---+---+---+------------+------+
# | 1| 2| 3|[c1, c2, c3]| 1_2_3|
# | 3| 4| 5| [c1, c2]| 3_4|
# | 6| 7| 8| [c2]| 7|
# | 9| 10| 11| [c1, c3]| 9_11|
# +---+---+---+------------+------+
But it's still not very dynamic - we need to know the columns' names to write the udf -> it can't be applied to any df.