postgresqlplpgsqltimescaledbpostgresql-triggers

Retaining and inserting state into a postgres aggregation function


I have a case where I have a table "tab" that stores data (keyed by symbol and time) as well as a second table "summ" that stores the exponential weighted running average of the data in the first table. I have it set up so that when data is added to the first table it triggers the calculation of the running average of the corresponding rows in the second table.

I can't figure out a good way to recover the state however for continuing on the running average. Instead it starts anew each time the trigger is called. How do I save the state of the aggregation at the end of

I have a minimal example at https://dbfiddle.uk/6dqXCwIQ which is also reproduced below.

First making a table that will contain the unsmoothed data as well as a summary table that will hold the smoothed data. There is also a trigger to populate the second table given the first.

/* Making a table that will contain the unsmoothed data */
create table tab (
  symbol text,
  time_to timestamptz,
  something int4,
  PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name2 ON tab USING btree (symbol, time_to);

/* Making a summarisation table too and a trigger to populate it when something is inserted into tab. */
CREATE OR REPLACE FUNCTION smoother_state(state double precision[],
                                          newval double precision, frac double precision)
 RETURNS double precision[]
 LANGUAGE plpgsql
 IMMUTABLE PARALLEL SAFE LEAKPROOF
AS $function$
  declare
      resul double precision := case when state[1] is null then newval else state[1] * (1-frac) + newval * frac end;
  begin
    return ARRAY[resul, coalesce(state[2] + 1, 1)];
  END;
 $function$
;
CREATE OR REPLACE AGGREGATE smoother(val double precision, frac double precision) (
    SFUNC = smoother_state,
    STYPE = double precision[2]
);

create table summ (
  symbol text,
  time_to timestamptz,
  smoothed_something double precision,
  number_of_periods double precision,
  PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name3 ON summ USING btree (symbol, time_to);

/* Making a trigger.
I guess it should read the smoothed_something and number_of_periods values for the symbol
and use them as an initial state vector.
*/
CREATE OR REPLACE FUNCTION do_update()
 RETURNS trigger
 LANGUAGE plpgsql
 PARALLEL SAFE STRICT LEAKPROOF
AS $function$
  DECLARE
    BEGIN
with a as (select symbol, time_to, something,
  smoother(something, 0.3) over (partition by symbol order by time_to) as smoo
FROM newtab
  ), b as (select symbol, time_to, smoo[1] as smoothed_something, smoo[2] as number_of_periods from a)
INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods) select * from b;
        RETURN null;
    END;
$function$
;

create trigger update_smoothed after
insert on tab
referencing new table as newtab
for each statement
execute function do_update()

Inserting data the first time both tab and summ look correct.

insert into tab (symbol, time_to, something) values
  ('a', '2022-01-01 00:00:15+01:00'::timestamptz, 15),
  ('b', '2021-01-01 00:00:15+01:00'::timestamptz, 18),
  ('b', '2022-01-01 00:00:15+01:00'::timestamptz, 13),
  ('b', '2023-01-01 00:00:15+01:00'::timestamptz, 11),
  ('b', '2024-01-01 00:00:15+01:00'::timestamptz, 3),
  ('c', '2022-01-01 00:00:16+01:00'::timestamptz, 15),
  ('c', '2022-01-01 00:00:17+01:00'::timestamptz, 150);

Inserting data the second time tab looks correct but summ is not as the aggregation starts from a null state.

insert into tab (symbol, time_to, something) values
  ('a', '2022-06-01 00:00:15+01:00'::timestamptz, 150),
  ('a', '2022-07-01 01:00:15+01:00'::timestamptz, 170),
  ('b', '2024-08-01 00:00:15+01:00'::timestamptz, 180),
  ('b', '2024-09-01 00:00:15+01:00'::timestamptz, 130);

You can see the resultant data in each table below and also in the fiddle site.

The data and showing the data errors

For this problem:


Solution

  • An alternative, closer to the idea from the title: the function can accept an init_state parameter:
    demo at db<>fiddle

    CREATE OR REPLACE FUNCTION smoother_state
       (state double precision[],
        newval double precision, 
        frac double precision,
        init_state double precision[] default null::float[] )
     RETURNS double precision[]
     LANGUAGE sql
     IMMUTABLE PARALLEL SAFE LEAKPROOF
     RETURN ARRAY[ coalesce(state[1] * (1-frac) + newval * frac, 
                            init_state[1] * (1-frac) + newval * frac, 
                            newval)
                  ,coalesce(state[2] + 1, 
                            init_state[2] + 1,
                            1)];
    CREATE OR REPLACE AGGREGATE smoother(val double precision, 
                                         frac double precision, 
                                         init_state double precision[]) (
        SFUNC = smoother_state,
        STYPE = double precision[2]
    );
    

    You still need to look up the init state and inject it:

    CREATE OR REPLACE FUNCTION do_update()
     RETURNS trigger PARALLEL SAFE STRICT LEAKPROOF
    AS $function$ BEGIN
    WITH a AS (
      SELECT symbol, 
             time_to, 
             smoother(something, 0.3, init_state)OVER w AS smoo
      FROM newtab
      LEFT JOIN(SELECT DISTINCT ON(symbol)
                       symbol,
                       array[smoothed_something,number_of_periods] AS init_state
                FROM summ
                ORDER BY symbol, time_to DESC) AS init_states
      USING(symbol)
      WINDOW w AS(PARTITION BY symbol ORDER BY time_to)
    )
    INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods) 
    SELECT symbol, 
           time_to, 
           smoo[1] as smoothed_something, 
           smoo[2] as number_of_periods 
    FROM a;
    RETURN null; END $function$ LANGUAGE plpgsql;
    
    symbol time_to smoothed_something number_of_periods
    a 2021-12-31 23:00:15+00 15 1
    b 2020-12-31 23:00:15+00 18 1
    b 2021-12-31 23:00:15+00 16.5 2
    b 2022-12-31 23:00:15+00 14.849999999999998 3
    b 2023-12-31 23:00:15+00 11.294999999999998 4
    c 2021-12-31 23:00:16+00 15 1
    c 2021-12-31 23:00:17+00 55.5 2
    a 2022-06-01 00:00:15+01 55.5 2
    a 2022-07-01 01:00:15+01 89.85 3
    b 2024-08-01 00:00:15+01 61.9065 5
    b 2024-09-01 00:00:15+01 82.33455000000001 6