pythonapache-sparkpysparkapache-spark-sqlpartition-by

Rank date column based on dates in another table date column


I have the following tables

df_obs.show()
Item No Date_Observed
Item 1 2021-09-20
Item 1 2022-12-05
Item 2 2022-10-27
Item 1 2022-09-20
Item 2 2023-02-20
Item 2 2023-03-20
Item 1 2023-01-20
df_purchase.withColumn("rank", dense_rank().over(Window.partitionBy("ITEM_No").orderBy(asc("Date_Purchase")))).show()
Item_No Date_Purchase rank
Item 1 2021-08-21 1
Item 1 2022-02-23 2
Item 1 2022-12-29 3
Item 2 2022-09-20 1
Item 2 2023-01-20 2

I want to rank df_obs Date_Observed column based on rank in df_purchase That is Date_Observed comes between which date range

Needed Output

Item No Date_Observed rank
Item 1 2021-09-20 1
Item 1 2022-12-05 2
Item 2 2022-10-27 1
Item 1 2022-09-20 2
Item 2 2023-02-20 2
Item 2 2023-03-20 2
Item 1 2023-01-15 3

Example: row 2 in df_obs get rank 2 as Date_Observed (2022-12-05) comes between 2022-02-23 (rank 2) and 2022-12-29(rank3) in df_purchase table


Solution

  • Define the join condition to match the rows having same item number but the observed date in left datafrmae must be greater than the purchase date in the right then join the dataframes on this condition then group and aggregate rank with max

    cond = (df_obs['Item No'] == df_purchase['Item_No']) & \
           (df_obs['Date_Observed'] >= df_purchase['Date_Purchase'])
    
    
    result = (
        df_obs
        .join(df_purchase, on=cond, how='left')
        .groupBy(*df_obs.columns)
        .agg(F.max('rank').alias('rank'))
    )
    

    Result

    +-------+-------------+----+
    |Item No|Date_Observed|rank|
    +-------+-------------+----+
    | Item 1|   2021-09-20|   1|
    | Item 1|   2022-12-05|   2|
    | Item 2|   2022-10-27|   1|
    | Item 1|   2022-09-20|   2|
    | Item 2|   2023-02-20|   2|
    | Item 2|   2023-03-20|   2|
    | Item 1|   2023-01-20|   3|
    +-------+-------------+----+