I am trying to calculate the timestamp difference on cumulative rows based on ID and status columns
Example dataframe:
ID | TIMESTAMP | STATUS |
---|---|---|
V1 | 2023-06-18 13:00:00 | 1 |
V1 | 2023-06-18 13:01:00 | 1 |
V1 | 2023-06-18 13:02:00 | 1 |
V1 | 2023-06-18 13:03:00 | 1 |
V1 | 2023-06-18 13:06:00 | 0 |
V1 | 2023-06-18 13:07:00 | 0 |
V1 | 2023-06-18 13:08:00 | 0 |
V1 | 2023-06-18 13:09:00 | 1 |
V1 | 2023-06-18 13:10:00 | 1 |
V1 | 2023-06-18 13:11:00 | 1 |
V1 | 2023-06-18 13:12:00 | 1 |
The expected result should be like
ID | TIMESTAMP | STATUS | TIMESTAMP_DIFF(in_seconds) |
---|---|---|---|
V1 | 2023-06-18 13:00:00 | 1 | 0 |
V1 | 2023-06-18 13:01:00 | 1 | 60 |
V1 | 2023-06-18 13:02:00 | 1 | 120 |
V1 | 2023-06-18 13:03:00 | 1 | 180 |
V1 | 2023-06-18 13:06:00 | 0 | 0 |
V1 | 2023-06-18 13:07:00 | 0 | 60 |
V1 | 2023-06-18 13:08:00 | 0 | 120 |
V1 | 2023-06-18 13:09:00 | 1 | 0 |
V1 | 2023-06-18 13:10:00 | 1 | 60 |
V1 | 2023-06-18 13:11:00 | 1 | 120 |
V1 | 2023-06-18 13:12:00 | 1 | 180 |
You can achieve this in PySpark using window functions. The idea is to create a group for consecutive rows with the same STATUS (partitioned by ID), then calculate the difference between the current TIMESTAMP and the first TIMESTAMP in each group.
Here’s how you can do it:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Sample data
data = [
("V1", "2023-06-18 13:00:00", 1),
("V1", "2023-06-18 13:01:00", 1),
("V1", "2023-06-18 13:02:00", 1),
("V1", "2023-06-18 13:03:00", 1),
("V1", "2023-06-18 13:06:00", 0),
("V1", "2023-06-18 13:07:00", 0),
("V1", "2023-06-18 13:08:00", 0),
("V1", "2023-06-18 13:09:00", 1),
("V1", "2023-06-18 13:10:00", 1),
("V1", "2023-06-18 13:11:00", 1),
("V1", "2023-06-18 13:12:00", 1),
]
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(data, ["ID", "TIMESTAMP", "STATUS"])
# Convert TIMESTAMP to timestamp type
df = df.withColumn("TIMESTAMP", F.to_timestamp("TIMESTAMP"))
# Identify group for consecutive STATUS using difference of row_number
window_id = Window.partitionBy("ID").orderBy("TIMESTAMP")
df = df.withColumn("row_num", F.row_number().over(window_id))
df = df.withColumn("status_change", F.when(
F.lag("STATUS").over(window_id) != F.col("STATUS"), 1
).otherwise(0))
df = df.withColumn("group", F.sum("status_change").over(window_id.rowsBetween(Window.unboundedPreceding, 0)))
# Calculate the first timestamp in each group
window_group = Window.partitionBy("ID", "group").orderBy("TIMESTAMP")
df = df.withColumn("first_ts", F.first("TIMESTAMP").over(window_group))
# Calculate the difference in seconds
df = df.withColumn("TIMESTAMP_DIFF(in_seconds)", F.col("TIMESTAMP").cast("long") - F.col("first_ts").cast("long"))
# Select final columns
result = df.select("ID", "TIMESTAMP", "STATUS", "TIMESTAMP_DIFF(in_seconds)")
result.show(truncate=False)
Also, keep in mind that to use pyspark you will have to have the right version of java installed in your machine. For linux, you can use the following commands:
sudo apt update
sudo apt install openjdk-17-jdk
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
java -version
To install pyspark, use the following commands:
pip install pyspark
pip install pyspark[sql]