sqldataframeapache-spark-sqltime-series

SQL to Retrieve, for all Rows, the Sum of Values of Previous Rows By Time Series


My SQL is rusty. I have a DataFrame/table like this (partially shown, and this is just a sample, not the real data):

+--------------------+-----+---+
|           timestamp|value| id|
+--------------------+-----+---+
|2024-10-05 20:38:...|   67|  0|
|2024-10-05 19:38:...|   14|  1|
|2024-10-05 18:38:...|   80|  2|
|2024-10-05 17:38:...|    6|  3|
+--------------------+-----+---+

What I want to do, in plain English, is this: suppose an id is in row 0. I want get a result set of data such that for the id in row 0, I return the sum of all data in column value for all rows 3 hours before the timestamp in row 0, and including the value for row 0. Then, I want to do this for all the rows, 0 through n (where n rows could be large, on the order of hundreds of millions).

So, my output would look like this (truncated a bit):

+--------------------+-----+---+
|           timestamp|sum  | id|
+--------------------+-----+---+
|2024-10-05 20:38:...|  167|  0| /* this result is the `value` of the id in row 0, plus the three hours' of previous data */
|2024-10-05 19:38:...|  100|  1| /* this would be more than 100 with the full dataset, etc. */
|2024-10-05 18:38:...|   86|  2|
+--------------------+-----+---+ /* etc etc */

I'm sure this question or a variant has been asked, but I've done a ton of research and I can't seem to find this. To be more specific, I'm working with Spark DataFrames, but vanilla SQL is also fine.


Solution

  • In SQLite you could do

    SELECT
      d.timestamp,
      d.value,
      d.id,
      (
        SELECT
          SUM(di.value)
        FROM data di
        WHERE
          di.timestamp BETWEEN datetime(d.timestamp, '-3 hours') AND d.timestamp
      ) AS sum
    FROM data d;
    

    Not really familiar with Spark but googling Spark DataFrames and "window functions" returns promising results

    Something along the lines of

    Window.partitionBy().orderBy(col("timestamp").cast("long")).rangeBetween(-10800, 0) seems the way to go