I have those 2 DataFrames:
df1:
+---+----------+----------+
|id |id_special|date_1 |
+---+----------+----------+
|1 |101 |2024-11-01|
|2 |102 |2024-11-03|
|3 |103 |2024-11-04|
|4 |null |2024-11-05|
+---+----------+----------+
df2:
+----------+----------+------+
|id_special|date_2 |type |
+----------+----------+------+
|101 |2024-10-30|Type_1|
|101 |2024-10-31|Type_2|
|101 |2024-11-01|Type_3|
|102 |2024-11-03|Type_4|
+----------+----------+------+
My goal is to create a new column named df2_type
in df1
.
To do so, I need a special join between df1
and df2
.
Here are the rule to create the column df2_type
.
So, from the precious DataFrames, this is the result that I am expecting:
+---+----------+----------+--------+
|id |id_special|date_1 |df2_type|
+---+----------+----------+--------+
|1 |101 |2024-11-01|Type_2 |
|2 |102 |2024-11-03|Unknown |
|3 |103 |2024-11-04|Unknown |
|4 |null |2024-11-05|Unknown |
+---+----------+----------+--------+
I tried to do a join between my 2 DataFrames, but I never been able to join it properly. Here is the code that I have:
from awsglue.context import GlueContext
from datetime import date
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType
glueContext = GlueContext(SparkContext.getOrCreate())
data1 = [
(1, 101, date.fromisoformat("2024-11-01")),
(2, 102, date.fromisoformat("2024-11-03")),
(3, 103, date.fromisoformat("2024-11-04")),
(4, None, date.fromisoformat("2024-11-05")),
]
data2 = [
(101, date.fromisoformat("2024-10-30"), "Type_1"),
(101, date.fromisoformat("2024-10-31"), "Type_2"),
(101, date.fromisoformat("2024-11-01"), "Type_3"),
(102, date.fromisoformat("2024-11-03"), "Type_4"),
]
schema1 = StructType([
StructField("id", IntegerType(), True), # Unique key
StructField("id_special", IntegerType(), True),
StructField("date_1", DateType(), True),
])
schema2 = StructType([
StructField("id_special", IntegerType(), True),
StructField("date_2", DateType(), True),
StructField("type", StringType(), True),
])
df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)
# Step 1 - Add df2_type columns
df1 = df1.withColumn("df2_type", lit(None))
# The final DataFrame need to be like this
# +---+----------+----------+--------+
# |id |id_special|date_1 |df2_type|
# +---+----------+----------+--------+
# |1 |101 |2024-11-01|Type_2 |
# |2 |102 |2024-11-03|Unknown |
# |3 |103 |2024-11-04|Unknown |
# |4 |null |2024-11-05|Unknown |
# +---+----------+----------+--------+
Transform the task into a simple (outer) join and run some post-processing afterwards:
Step 1: group df2
by id_special
and collect all rows into a list of structs.
from pyspark.sql import functions as F
df2grouped = df2.groupBy('id_special') \
.agg(F.collect_list(F.struct('date_2', 'type')).alias('data'))
Step 2: run a left outer join between df1
and the df2grouped
. Sort the array from df2grouped
, filter out the dates that are not matching and then take the type
from the first entry of the remaining array. Finally replace all null
values with Unknown
.
df1.join(df2grouped, on = 'id_special', how = 'left') \
.withColumn('data', F.expr('array_sort(data, (l,r) -> datediff(r.date_2, l.date_2))')) \
.withColumn('data', F.filter(F.col('data'), lambda x: x.date_2 < F.col('date_1'))) \
.withColumn('df2_type', F.expr('data[0].type')) \
.drop('data') \
.na.fill('Unknown')