I have a df in the format of:
| name | status | date |
____________________________
| ben | active | 01/01 |
| ben | active | 01/02 |
| ben | active | 01/03 |
| ben | in-active | 01/04 |
| ben | in-active | 01/05 |
| ben | active | 01/06 |
| ben | active | 01/07 |
and i need to create a df formatted as:
| name | status | start_date | end_date |
____________________________________________
| ben | active | 01/01 | 01/03 |
| ben | in-active | 01/04 | 01/05 |
| ben | active | 01/06 | 01/07 |
I am having a hard time wrapping my head around the best way to do this
There's some trickery for the determining the very end of your status ranges, but this code should do what you want.
from pyspark.sql import types as T, functions as F, SparkSession, Window
import datetime
spark = SparkSession.builder.getOrCreate()
schema = T.StructType([
T.StructField("name", T.StringType(), False),
T.StructField("status", T.StringType(), False),
T.StructField("date", T.DateType(), False),
])
data = [
{"name": "ben", "status": "active", "date": datetime.date(day=1, month=1, year=2021)},
{"name": "ben", "status": "active", "date": datetime.date(day=2, month=1, year=2021)},
{"name": "ben", "status": "inactive", "date": datetime.date(day=3, month=1, year=2021)},
{"name": "ben", "status": "inactive", "date": datetime.date(day=4, month=1, year=2021)},
{"name": "ben", "status": "active", "date": datetime.date(day=5, month=1, year=2021)},
{"name": "ben", "status": "active", "date": datetime.date(day=6, month=1, year=2021)},
{"name": "ben", "status": "active", "date": datetime.date(day=7, month=1, year=2021)},
]
df = spark.createDataFrame(data, schema)
df.show()
"""
+----+--------+----------+
|name| status| date|
+----+--------+----------+
| ben| active|2021-01-01|
| ben| active|2021-01-02|
| ben|inactive|2021-01-03|
| ben|inactive|2021-01-04|
| ben| active|2021-01-05|
| ben| active|2021-01-06|
| ben| active|2021-01-07|
+----+--------+----------+
"""
date_window = Window().partitionBy("name").orderBy("date")
df = df.select(
"*",
F.lag("status").over(date_window).alias("previous_status"),
F.lead("date").over(date_window).alias("next_date")
)
boundaries = df.filter(
(F.col("status") != F.col("previous_status")) | (F.col("previous_status").isNull()) | (F.col("next_date").isNull())
)
boundaries.show()
"""
+----+--------+----------+---------------+----------+
|name| status| date|previous_status| next_date|
+----+--------+----------+---------------+----------+
| ben| active|2021-01-01| null|2021-01-02|
| ben|inactive|2021-01-03| active|2021-01-04|
| ben| active|2021-01-05| inactive|2021-01-06|
| ben| active|2021-01-07| active| null|
+----+--------+----------+---------------+----------+
"""
computed_ends = boundaries.select(
"*",
F.lead("date").over(date_window).alias("maybe_end_date"),
)
computed_ends.show()
"""
+----+--------+----------+---------------+----------+--------------+
|name| status| date|previous_status| next_date|maybe_end_date|
+----+--------+----------+---------------+----------+--------------+
| ben| active|2021-01-01| null|2021-01-02| 2021-01-03|
| ben|inactive|2021-01-03| active|2021-01-04| 2021-01-05|
| ben| active|2021-01-05| inactive|2021-01-06| 2021-01-07|
| ben| active|2021-01-07| active| null| null|
+----+--------+----------+---------------+----------+--------------+
"""
unbounded_end = computed_ends.select(
"*",
F.lead("maybe_end_date").over(date_window).alias("next_end_date")
)
unbounded_end.show()
"""
+----+--------+----------+---------------+----------+--------------+-------------+
|name| status| date|previous_status| next_date|maybe_end_date|next_end_date|
+----+--------+----------+---------------+----------+--------------+-------------+
| ben| active|2021-01-01| null|2021-01-02| 2021-01-03| 2021-01-05|
| ben|inactive|2021-01-03| active|2021-01-04| 2021-01-05| 2021-01-07|
| ben| active|2021-01-05| inactive|2021-01-06| 2021-01-07| null|
| ben| active|2021-01-07| active| null| null| null|
+----+--------+----------+---------------+----------+--------------+-------------+
"""
corrected_end = unbounded_end.select(
F.col("name"),
F.col("status"),
F.col("date").alias("start_date"),
F.when(
F.col("next_end_date").isNull(),
F.col("maybe_end_date")
).otherwise(
F.date_sub(
F.col("maybe_end_date"),
1
)
).alias("end_date")
).filter(
F.col("end_date").isNotNull()
)
corrected_end.show()
"""
+----+--------+----------+----------+
|name| status|start_date| end_date|
+----+--------+----------+----------+
| ben| active|2021-01-01|2021-01-02|
| ben|inactive|2021-01-03|2021-01-04|
| ben| active|2021-01-05|2021-01-07|
+----+--------+----------+----------+
"""