I need to do a check vertical on my dataset in PySpark to flag only row that match some condition.
In detail: I only have to flag only row where there is an "PURCHASE + SELLER" preceded by a "SALE + CUSTOMER" (bold in the example below).
Example:
Input
id | order_type | Initiative | date |
---|---|---|---|
1 | PURCHASE | Seller | 2022-02-11 |
1 | PURCHASE | Seller | 2022-02-10 |
1 | PURCHASE | Seller | 2022-02-09 |
1 | SALE | Customer | 2022-02-08 |
1 | SALE | Customer | 2022-02-07 |
1 | SALE | Customer | 2022-02-06 |
1 | PURCHASE | Seller | 2022-02-05 |
1 | SALE | Customer | 2022-02-04 |
1 | PURCHASE | Seller | 2022-02-03 (keep attention) |
2 | PURCHASE | Customer | 2022-02-11 |
Output
id | order_type | Initiative | date | flag | difference (in days) |
---|---|---|---|---|---|
1 | PURCHASE | Seller | 2022-02-11 | 1 | 3 |
1 | PURCHASE | Seller | 2022-02-10 | 1 | 2 |
1 | PURCHASE | Seller | 2022-02-09 | 1 | 1 |
1 | SALE | Customer | 2022-02-08 | 0 | |
1 | SALE | Customer | 2022-02-07 | 0 | |
1 | SALE | Customer | 2022-02-06 | 0 | |
1 | PURCHASE | Seller | 2022-02-05 | 1 | 1 |
1 | SALE | Customer | 2022-02-04 | 0 | |
1 | PURCHASE | Seller | 2022-02-03 | 0 (condition is not satisfied) | |
2 | PURCHASE | Customer | 2022-02-11 | 0 |
here's my implementation
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
df = spark.createDataFrame(
[
("1", "PURCHASE", "Seller", "2022-02-11"),
("1", "PURCHASE", "Seller", "2022-02-10"),
("1", "PURCHASE", "Seller", "2022-02-09"),
("1", "SALE", "Customer", "2022-02-08"),
("1", "SALE", "Customer", "2022-02-07"),
("1", "SALE", "Customer", "2022-02-06"),
("1", "PURCHASE", "Seller", "2022-02-05"),
("1", "SALE", "Customer", "2022-02-04"),
("1", "PURCHASE", "Seller", "2022-02-03"),
("2", "PURCHASE", "Customer", "2022-02-11"),
],
["id", "order_type", "Initiative", "date"],
)
df = df.withColumn("date", F.col("date").cast(DateType()))
df.show()
sale_df = df.filter((F.lower(F.col("order_type")) == "sale") & (F.lower(F.col("Initiative")) == "customer"))
sale_df.show()
row_window = Window.partitionBy(
"df.id",
"df.order_type",
"df.Initiative",
"df.date",
).orderBy(F.col("s.date").desc())
final_df = (
df.alias("df")
.join(
sale_df.alias("s"),
on=(
(F.col("s.date") < F.col("df.date"))
& (F.lower(F.col("df.order_type")) == "purchase")
& (F.lower(F.col("df.Initiative")) == "seller")
),
how="left",
)
.withColumn("row_num", F.row_number().over(row_window))
.filter(F.col("row_num") == 1)
.withColumn("day_diff", F.datediff(F.col("df.date"),F.col("s.date")))
.withColumn(
"flag",
F.when(
F.col("s.id").isNull(),
F.lit(0),
).otherwise(F.lit(1)),
)
.select("df.*", "flag", "day_diff")
.orderBy(F.col("df.id").asc(),F.col("df.date").desc())
)
final_df.show()
OUTPUTS:
+---+----------+----------+----------+
| id|order_type|Initiative| date|
+---+----------+----------+----------+
| 1| PURCHASE| Seller|2022-02-11|
| 1| PURCHASE| Seller|2022-02-10|
| 1| PURCHASE| Seller|2022-02-09|
| 1| SALE| Customer|2022-02-08|
| 1| SALE| Customer|2022-02-07|
| 1| SALE| Customer|2022-02-06|
| 1| PURCHASE| Seller|2022-02-05|
| 1| SALE| Customer|2022-02-04|
| 1| PURCHASE| Seller|2022-02-03|
| 2| PURCHASE| Customer|2022-02-11|
+---+----------+----------+----------+
+---+----------+----------+----------+
| id|order_type|Initiative| date|
+---+----------+----------+----------+
| 1| SALE| Customer|2022-02-08|
| 1| SALE| Customer|2022-02-07|
| 1| SALE| Customer|2022-02-06|
| 1| SALE| Customer|2022-02-04|
+---+----------+----------+----------+
final output:
+---+----------+----------+----------+----+--------+
| id|order_type|Initiative| date|flag|day_diff|
+---+----------+----------+----------+----+--------+
| 1| PURCHASE| Seller|2022-02-11| 1| 3|
| 1| PURCHASE| Seller|2022-02-10| 1| 2|
| 1| PURCHASE| Seller|2022-02-09| 1| 1|
| 1| SALE| Customer|2022-02-08| 0| null|
| 1| SALE| Customer|2022-02-07| 0| null|
| 1| SALE| Customer|2022-02-06| 0| null|
| 1| PURCHASE| Seller|2022-02-05| 1| 1|
| 1| SALE| Customer|2022-02-04| 0| null|
| 1| PURCHASE| Seller|2022-02-03| 0| null|
| 2| PURCHASE| Customer|2022-02-11| 0| null|
+---+----------+----------+----------+----+--------+