pysparkwindowpartitioning

Last value in a partition, order by a timestamp column PySpark


I need to create a "last_value" which contains the last "value" partitioning by an "id" and ordering by a "created_date".

Example of dataset:

id created_date value
123 2023-10-01 08:12:22 Not OK
123 2023-10-02 07:13:14 In Progress
123 2023-10-03 08:52:33 Document request
456 2023-10-01 11:34:01 OK

My desidered output:

id created_date value last_value
123 2023-10-01 08:12:22 Not OK Document request
123 2023-10-01 07:13:14 In Progress Document request
123 2023-10-01 08:52:33 Document request Document request
456 2023-10-01 11:34:01 OK OK

I tried, without success, this code:

w = Window.partitionBy('id').orderBy('created_date')

df = df.withColumn('last_value', F.last('value').over(w))

Thank u!


Solution

  • You'd have to use .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) with the last() function. Implicitly, this value is .rowsBetween(Window.unboundedPreceding, Window.currentRow)

    Further Reading:

    1. https://docs.snowflake.com/en/sql-reference/functions-analytic
    2. https://docs.snowflake.com/en/sql-reference/functions/last_value

    Referenced documents are from Snowflake, however the concepts remain the same. Hope this helps.

    from pyspark.sql.types import TimestampType
    
    data = [
        (123, "2023-10-01 08:12:22", "Not OK"),
        (123, "2023-10-02 07:13:14", "In Progress"),
        (123, "2023-10-03 08:52:33", "Document request"),
        (456, "2023-10-01 11:34:01", "OK"),
    ]
    
    schema = ["id", "created_date", "value"]
    
    df = spark.createDataFrame(data, schema)
    df = df.withColumn('created_date', col('created_date').cast(TimestampType()))
    
    df.show()
    +---+-------------------+----------------+
    | id|       created_date|           value|
    +---+-------------------+----------------+
    |123|2023-10-01 08:12:22|          Not OK|
    |123|2023-10-02 07:13:14|     In Progress|
    |123|2023-10-03 08:52:33|Document request|
    |456|2023-10-01 11:34:01|              OK|
    +---+-------------------+----------------+
    
    from pyspark.sql.functions import last, col
    from pyspark.sql.window import Window
    
    df = df.withColumn(
        "last_value",
        last("value").over(
            Window.partitionBy("id")
            .orderBy("created_date")
            .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
        ),
    )
    
    df.show()
    +---+-------------------+----------------+----------------+
    | id|       created_date|           value|      last_value|
    +---+-------------------+----------------+----------------+
    |123|2023-10-01 08:12:22|          Not OK|Document request|
    |123|2023-10-02 07:13:14|     In Progress|Document request|
    |123|2023-10-03 08:52:33|Document request|Document request|
    |456|2023-10-01 11:34:01|              OK|              OK|
    +---+-------------------+----------------+----------------+