Dataframe:
Above is my dataframe, I want to add a new column with value 1, if first transaction_date for an item is after 01.01.2022, else 0. To do this i use the below window.partition code:
windowSpec = Window.partitionBy("article_id").orderBy("transaction_date")
feature_grid = feature_grid.withColumn("row_number",row_number().over(windowSpec)) \
.withColumn('new_item',
when(
(f.col('row_number') == 1) & (f.col('transaction_date') >= ‘01.01.2022’), 1) .otherwise(0))\
.drop('row_number')
I want to perform clustering on the dataframe, for which I am using VectorAssembler with the below code:
from pyspark.ml.feature import VectorAssembler
input_cols = feature_grid.columns
assemble=VectorAssembler(inputCols= input_cols, outputCol='features')
assembled_data=assemble.transform(feature_grid)
For standardisation;
from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)
display(data_scale_output)
The standardisation code chunk gives me the below error, only when I am using the above partitioning method, without that partitioning method, the code is working fine.
Error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 182.0 failed 4 times, most recent failure: Lost task 0.3 in stage 182.0 (TID 3635) (10.205.234.124 executor 1): org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3621/907379691
Can someone tell me what am I doing wrong here, or what is the cause of the error ?
This error is triggered by the null values in columns, which are assembled when using the spark VectorAssembler
. Please fill the null column before transform your dataframe.