apache-sparkpysparkpartitioningstandardization

Getting error while doing Standardization after Window Partitioning of Pyspark Dataframe


Dataframe:

Dataframe

Above is my dataframe, I want to add a new column with value 1, if first transaction_date for an item is after 01.01.2022, else 0. To do this i use the below window.partition code:

windowSpec  = Window.partitionBy("article_id").orderBy("transaction_date")

feature_grid = feature_grid.withColumn("row_number",row_number().over(windowSpec)) \
.withColumn('new_item', 
            when(
              (f.col('row_number') == 1) & (f.col('transaction_date') >= ‘01.01.2022’), 1) .otherwise(0))\
.drop('row_number')

I want to perform clustering on the dataframe, for which I am using VectorAssembler with the below code:

from pyspark.ml.feature import VectorAssembler
input_cols = feature_grid.columns

assemble=VectorAssembler(inputCols= input_cols, outputCol='features')
assembled_data=assemble.transform(feature_grid)

For standardisation;

from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)
display(data_scale_output)

The standardisation code chunk gives me the below error, only when I am using the above partitioning method, without that partitioning method, the code is working fine.

Error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 182.0 failed 4 times, most recent failure: Lost task 0.3 in stage 182.0 (TID 3635) (10.205.234.124 executor 1): org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3621/907379691

Can someone tell me what am I doing wrong here, or what is the cause of the error ?


Solution

  • This error is triggered by the null values in columns, which are assembled when using the spark VectorAssembler. Please fill the null column before transform your dataframe.