pythonpyspark

How to join 2 DataFrames on really specific condition?


I have those 2 DataFrames:

df1:
+---+----------+----------+
|id |id_special|date_1    |
+---+----------+----------+
|1  |101       |2024-11-01|
|2  |102       |2024-11-03|
|3  |103       |2024-11-04|
|4  |null      |2024-11-05|
+---+----------+----------+
df2:
+----------+----------+------+
|id_special|date_2    |type  |
+----------+----------+------+
|101       |2024-10-30|Type_1|
|101       |2024-10-31|Type_2|
|101       |2024-11-01|Type_3|
|102       |2024-11-03|Type_4|
+----------+----------+------+

My goal is to create a new column named df2_type in df1. To do so, I need a special join between df1 and df2. Here are the rule to create the column df2_type.

  1. If df1.id_special is null, set df1.df2_type to "Unknown".
  2. If df1.id_special is not in df2.id_client, set df1.df2_type to "Unknown".
  3. If df1.id_special is in df2.id_client:
    1. Get the record where df2.date_2 < df1.date_1 and is the closest to df1.date_1
    2. From the record, use df2.type to set df2_type.

So, from the precious DataFrames, this is the result that I am expecting:

+---+----------+----------+--------+
|id |id_special|date_1    |df2_type|
+---+----------+----------+--------+
|1  |101       |2024-11-01|Type_2  |
|2  |102       |2024-11-03|Unknown |
|3  |103       |2024-11-04|Unknown |
|4  |null      |2024-11-05|Unknown |
+---+----------+----------+--------+

I tried to do a join between my 2 DataFrames, but I never been able to join it properly. Here is the code that I have:

from awsglue.context import GlueContext
from datetime import date
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType

glueContext = GlueContext(SparkContext.getOrCreate())

data1 = [
    (1, 101, date.fromisoformat("2024-11-01")),
    (2, 102, date.fromisoformat("2024-11-03")),
    (3, 103, date.fromisoformat("2024-11-04")),
    (4, None, date.fromisoformat("2024-11-05")),
]

data2 = [
    (101, date.fromisoformat("2024-10-30"), "Type_1"),
    (101, date.fromisoformat("2024-10-31"), "Type_2"),
    (101, date.fromisoformat("2024-11-01"), "Type_3"),
    (102, date.fromisoformat("2024-11-03"), "Type_4"),
]

schema1 = StructType([
   StructField("id", IntegerType(), True), # Unique key
   StructField("id_special", IntegerType(), True),
   StructField("date_1", DateType(), True),
])

schema2 = StructType([
   StructField("id_special", IntegerType(), True),
   StructField("date_2", DateType(), True),
   StructField("type", StringType(), True),
])


df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)


# Step 1 - Add df2_type columns
df1 = df1.withColumn("df2_type", lit(None))


# The final DataFrame need to be like this
# +---+----------+----------+--------+
# |id |id_special|date_1    |df2_type|
# +---+----------+----------+--------+
# |1  |101       |2024-11-01|Type_2  |
# |2  |102       |2024-11-03|Unknown |
# |3  |103       |2024-11-04|Unknown |
# |4  |null      |2024-11-05|Unknown |
# +---+----------+----------+--------+

Solution

  • Transform the task into a simple (outer) join and run some post-processing afterwards:

    Step 1: group df2 by id_special and collect all rows into a list of structs.

    from pyspark.sql import functions as F
    
    df2grouped = df2.groupBy('id_special') \
        .agg(F.collect_list(F.struct('date_2', 'type')).alias('data'))
    

    Step 2: run a left outer join between df1 and the df2grouped. Sort the array from df2grouped, filter out the dates that are not matching and then take the type from the first entry of the remaining array. Finally replace all null values with Unknown.

    df1.join(df2grouped, on = 'id_special', how = 'left') \
        .withColumn('data', F.expr('array_sort(data, (l,r) -> datediff(r.date_2, l.date_2))')) \
        .withColumn('data', F.filter(F.col('data'), lambda x: x.date_2 < F.col('date_1'))) \
        .withColumn('df2_type', F.expr('data[0].type')) \
        .drop('data') \
        .na.fill('Unknown')