I have a dataframe with 2 columns: process name
and process rank
. I want to add 2 more columns to the dataframe using windowing to find the minimum and maximum ranks and display them on each row.
See the sample columns 'Max Rank Process (output I want using windowing)' and 'Min Rank Process (output I want using windowing 2)' for what I actually want to output. It seems Windowing may not support 'column name' over without some sort of aggregate. If not windowing (or with windowing) how can I accomplish this?
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
schema = StructType([ \
StructField("Process",StringType(),True), \
StructField("Process_rank",IntegerType(),True), \
StructField("Max Rank Process (output I want using windowing)",StringType(),True) , \
StructField("Min Rank Process (output I want using windowing 2)",StringType(),True)
data = [("Inventory", 1, "Retire","Inventory"), \
("Data availability", 2, "Retire", "Inventory"), \
("Code Conversion", 3, "Retire", "Inventory"), \
("Retire", 4, "Retire", "Inventory")
df = spark.createDataFrame(data=data,schema=schema)
# window1: partition by Process name, order by rank max
w_max_rnk = Window.partitionBy("Process").orderBy(F.col("Process_rank").desc())
# window2: partition by Process name, order by rank min
w_max_rnk = Window.partitionBy("Process").orderBy(F.col("Process_rank").asc())
#windowed cols to find max and min processes from dataframe
df = df.withColumn("max_ranked_process", F.col("Process").over(w_max_rnk)) \
.withColumn("min_ranked_process", F.col("Process").over(w_max_rnk))
The performance is not great and it only works okay for a smaller dataframe, though this should give the correct result.
w = Window.orderBy('Process_rank').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = (df.withColumn('Max Rank Process', F.last('Process').over(w))
.withColumn('Min Rank Process', F.first('Process').over(w)))