pysparkpalantir-foundryfoundry-code-repositoriesfoundry-python-transform

How do I compute a range of statuses from a daily indicator?


I have a df in the format of:

| name | status    | date  |
____________________________
| ben  | active    | 01/01 |
| ben  | active    | 01/02 |
| ben  | active    | 01/03 |
| ben  | in-active | 01/04 |
| ben  | in-active | 01/05 |
| ben  | active    | 01/06 |
| ben  | active    | 01/07 |

and i need to create a df formatted as:

| name | status    | start_date | end_date |
____________________________________________
| ben  | active    |   01/01    |   01/03  |
| ben  | in-active |   01/04    |   01/05  |
| ben  | active    |   01/06    |   01/07  |

I am having a hard time wrapping my head around the best way to do this


Solution

  • There's some trickery for the determining the very end of your status ranges, but this code should do what you want.

    from pyspark.sql import types as T, functions as F, SparkSession, Window
    import datetime
    spark = SparkSession.builder.getOrCreate()
    
    schema = T.StructType([
      T.StructField("name", T.StringType(), False),
      T.StructField("status", T.StringType(), False),
      T.StructField("date", T.DateType(), False),
    ])
    data = [
      {"name": "ben", "status": "active", "date": datetime.date(day=1, month=1, year=2021)},
      {"name": "ben", "status": "active", "date": datetime.date(day=2, month=1, year=2021)},
      {"name": "ben", "status": "inactive", "date": datetime.date(day=3, month=1, year=2021)},
      {"name": "ben", "status": "inactive", "date": datetime.date(day=4, month=1, year=2021)},
      {"name": "ben", "status": "active", "date": datetime.date(day=5, month=1, year=2021)},
      {"name": "ben", "status": "active", "date": datetime.date(day=6, month=1, year=2021)},
      {"name": "ben", "status": "active", "date": datetime.date(day=7, month=1, year=2021)},
    ]
    
    df = spark.createDataFrame(data, schema)
    
    df.show()
    
    """
    +----+--------+----------+
    |name|  status|      date|
    +----+--------+----------+
    | ben|  active|2021-01-01|
    | ben|  active|2021-01-02|
    | ben|inactive|2021-01-03|
    | ben|inactive|2021-01-04|
    | ben|  active|2021-01-05|
    | ben|  active|2021-01-06|
    | ben|  active|2021-01-07|
    +----+--------+----------+
    """
    
    date_window = Window().partitionBy("name").orderBy("date")
    df = df.select(
      "*",
      F.lag("status").over(date_window).alias("previous_status"),
      F.lead("date").over(date_window).alias("next_date")
    )
    
    boundaries = df.filter(
      (F.col("status") != F.col("previous_status")) | (F.col("previous_status").isNull()) | (F.col("next_date").isNull())
    )
    
    boundaries.show()
    
    """
    +----+--------+----------+---------------+----------+
    |name|  status|      date|previous_status| next_date|
    +----+--------+----------+---------------+----------+
    | ben|  active|2021-01-01|           null|2021-01-02|
    | ben|inactive|2021-01-03|         active|2021-01-04|
    | ben|  active|2021-01-05|       inactive|2021-01-06|
    | ben|  active|2021-01-07|         active|      null|
    +----+--------+----------+---------------+----------+
    """
    
    computed_ends = boundaries.select(
      "*",
      F.lead("date").over(date_window).alias("maybe_end_date"),
    )
    
    computed_ends.show()
    
    """
    +----+--------+----------+---------------+----------+--------------+
    |name|  status|      date|previous_status| next_date|maybe_end_date|
    +----+--------+----------+---------------+----------+--------------+
    | ben|  active|2021-01-01|           null|2021-01-02|    2021-01-03|
    | ben|inactive|2021-01-03|         active|2021-01-04|    2021-01-05|
    | ben|  active|2021-01-05|       inactive|2021-01-06|    2021-01-07|
    | ben|  active|2021-01-07|         active|      null|          null|
    +----+--------+----------+---------------+----------+--------------+
    """
    
    unbounded_end = computed_ends.select(
      "*",
      F.lead("maybe_end_date").over(date_window).alias("next_end_date")
    )
    
    unbounded_end.show()
    
    """
    +----+--------+----------+---------------+----------+--------------+-------------+
    |name|  status|      date|previous_status| next_date|maybe_end_date|next_end_date|
    +----+--------+----------+---------------+----------+--------------+-------------+
    | ben|  active|2021-01-01|           null|2021-01-02|    2021-01-03|   2021-01-05|
    | ben|inactive|2021-01-03|         active|2021-01-04|    2021-01-05|   2021-01-07|
    | ben|  active|2021-01-05|       inactive|2021-01-06|    2021-01-07|         null|
    | ben|  active|2021-01-07|         active|      null|          null|         null|
    +----+--------+----------+---------------+----------+--------------+-------------+
    """
    
    corrected_end = unbounded_end.select(
      F.col("name"),
      F.col("status"),
      F.col("date").alias("start_date"),
      F.when(
        F.col("next_end_date").isNull(),
        F.col("maybe_end_date")
      ).otherwise(
        F.date_sub(
          F.col("maybe_end_date"),
          1
        )
      ).alias("end_date")
    ).filter(
      F.col("end_date").isNotNull()
    )
    
    corrected_end.show()
    
    """
    +----+--------+----------+----------+
    |name|  status|start_date|  end_date|
    +----+--------+----------+----------+
    | ben|  active|2021-01-01|2021-01-02|
    | ben|inactive|2021-01-03|2021-01-04|
    | ben|  active|2021-01-05|2021-01-07|
    +----+--------+----------+----------+
    """