dataframedictionarypysparklambdareduce

Create dataframe columns from a dictionary using lambda expression


I would like to create a pyspark dataframe dynamically using lambda form a give dictionary. Below is my dictionary.

cols = [
{'Technical Label':'UserID', 'Column Mapping':'UID','Technical Column':'No'},
{'Technical Label':'Type', 'Column Mapping':'X_TYPE','Technical Column':'No'},
{'Technical Label':'Name.1', 'Column Mapping':'NaturalName','Technical Column':'No'},
{'Technical Label':'Address.1', 'Column Mapping':'HomeAddress','Technical Column':'Yes'},
{'Technical Label':'Identifier.1', 'Column Mapping':'Human','Technical Column':'Yes'},
{'Technical Label':'Identifier.1', 'Column Mapping':'EX_CO','Technical Column':'Yes'},
{'Technical Label':'Identifier.IdentifierValue.1', 'Column Mapping':'X_CODE','Technical Column':'No'}
]

I want something similar

refdf = df.withColumn('UserID', df.UID).withColumn('Type', df.X_TYPE).withColumn('Name.1', df.NaturalName).withColumn('Address.1', lit('Address')...so on

I want to make a decision based on the Technical Column in the Dictionary. If the value is No then I want to derive the value from the dataframe and if the value is Yes then I want to lit the value from the respective Column Mapping dictionary key.

I wrote the below code but got error.

from functools import reduce
from pyspark.sql.functions import col, lit
df = spark.read.format('csv').option('header','true').option('delimiter','|').load('/my/path/file.txt')
x = reduce(lambda df,colsm : df.withColumn(colsm['Technical Label'], (lambda x: x['Column Mapping'] if x['Technical Column'] == 'No' else lit(x['Column Mapping']) , cols)), cols, df)
display(x)

Error

PySparkTypeError: [NOT_COLUMN] Argument col should be a Column, got tuple.

Cannot figure out what is wrong.


Solution

  • The error occurs because you're passing a tuple as an argument to withColumn, but it expects a valid column expression. Try the following:

    def create_column_expression(col_dict):
        if col_dict['Technical Column'] == 'No':
            return col(col_dict['Column Mapping']).alias(col_dict['Technical Label'])
        else:
            return lit(col_dict['Column Mapping']).alias(col_dict['Technical Label'])
    
    # Apply the transformations
    x = reduce(lambda df, col_dict: df.withColumn(col_dict['Technical Label'], create_column_expression(col_dict)), cols, df)