I have a case where I have a table "tab" that stores data (keyed by symbol and time) as well as a second table "summ" that stores the exponential weighted running average of the data in the first table. I have it set up so that when data is added to the first table it triggers the calculation of the running average of the corresponding rows in the second table.
I can't figure out a good way to recover the state however for continuing on the running average. Instead it starts anew each time the trigger is called. How do I save the state of the aggregation at the end of
I have a minimal example at https://dbfiddle.uk/6dqXCwIQ which is also reproduced below.
First making a table that will contain the unsmoothed data as well as a summary table that will hold the smoothed data. There is also a trigger to populate the second table given the first.
/* Making a table that will contain the unsmoothed data */
create table tab (
symbol text,
time_to timestamptz,
something int4,
PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name2 ON tab USING btree (symbol, time_to);
/* Making a summarisation table too and a trigger to populate it when something is inserted into tab. */
CREATE OR REPLACE FUNCTION smoother_state(state double precision[],
newval double precision, frac double precision)
RETURNS double precision[]
LANGUAGE plpgsql
IMMUTABLE PARALLEL SAFE LEAKPROOF
AS $function$
declare
resul double precision := case when state[1] is null then newval else state[1] * (1-frac) + newval * frac end;
begin
return ARRAY[resul, coalesce(state[2] + 1, 1)];
END;
$function$
;
CREATE OR REPLACE AGGREGATE smoother(val double precision, frac double precision) (
SFUNC = smoother_state,
STYPE = double precision[2]
);
create table summ (
symbol text,
time_to timestamptz,
smoothed_something double precision,
number_of_periods double precision,
PRIMARY KEY(time_to, symbol)
);
CREATE UNIQUE INDEX index_name3 ON summ USING btree (symbol, time_to);
/* Making a trigger.
I guess it should read the smoothed_something and number_of_periods values for the symbol
and use them as an initial state vector.
*/
CREATE OR REPLACE FUNCTION do_update()
RETURNS trigger
LANGUAGE plpgsql
PARALLEL SAFE STRICT LEAKPROOF
AS $function$
DECLARE
BEGIN
with a as (select symbol, time_to, something,
smoother(something, 0.3) over (partition by symbol order by time_to) as smoo
FROM newtab
), b as (select symbol, time_to, smoo[1] as smoothed_something, smoo[2] as number_of_periods from a)
INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods) select * from b;
RETURN null;
END;
$function$
;
create trigger update_smoothed after
insert on tab
referencing new table as newtab
for each statement
execute function do_update()
Inserting data the first time both tab and summ look correct.
insert into tab (symbol, time_to, something) values
('a', '2022-01-01 00:00:15+01:00'::timestamptz, 15),
('b', '2021-01-01 00:00:15+01:00'::timestamptz, 18),
('b', '2022-01-01 00:00:15+01:00'::timestamptz, 13),
('b', '2023-01-01 00:00:15+01:00'::timestamptz, 11),
('b', '2024-01-01 00:00:15+01:00'::timestamptz, 3),
('c', '2022-01-01 00:00:16+01:00'::timestamptz, 15),
('c', '2022-01-01 00:00:17+01:00'::timestamptz, 150);
Inserting data the second time tab looks correct but summ is not as the aggregation starts from a null state.
insert into tab (symbol, time_to, something) values
('a', '2022-06-01 00:00:15+01:00'::timestamptz, 150),
('a', '2022-07-01 01:00:15+01:00'::timestamptz, 170),
('b', '2024-08-01 00:00:15+01:00'::timestamptz, 180),
('b', '2024-09-01 00:00:15+01:00'::timestamptz, 130);
You can see the resultant data in each table below and also in the fiddle site.
For this problem:
An alternative, closer to the idea from the title: the function can accept an init_state
parameter:
demo at db<>fiddle
CREATE OR REPLACE FUNCTION smoother_state
(state double precision[],
newval double precision,
frac double precision,
init_state double precision[] default null::float[] )
RETURNS double precision[]
LANGUAGE sql
IMMUTABLE PARALLEL SAFE LEAKPROOF
RETURN ARRAY[ coalesce(state[1] * (1-frac) + newval * frac,
init_state[1] * (1-frac) + newval * frac,
newval)
,coalesce(state[2] + 1,
init_state[2] + 1,
1)];
CREATE OR REPLACE AGGREGATE smoother(val double precision,
frac double precision,
init_state double precision[]) (
SFUNC = smoother_state,
STYPE = double precision[2]
);
You still need to look up the init state and inject it:
CREATE OR REPLACE FUNCTION do_update()
RETURNS trigger PARALLEL SAFE STRICT LEAKPROOF
AS $function$ BEGIN
WITH a AS (
SELECT symbol,
time_to,
smoother(something, 0.3, init_state)OVER w AS smoo
FROM newtab
LEFT JOIN(SELECT DISTINCT ON(symbol)
symbol,
array[smoothed_something,number_of_periods] AS init_state
FROM summ
ORDER BY symbol, time_to DESC) AS init_states
USING(symbol)
WINDOW w AS(PARTITION BY symbol ORDER BY time_to)
)
INSERT INTO summ (symbol, time_to, smoothed_something, number_of_periods)
SELECT symbol,
time_to,
smoo[1] as smoothed_something,
smoo[2] as number_of_periods
FROM a;
RETURN null; END $function$ LANGUAGE plpgsql;
symbol | time_to | smoothed_something | number_of_periods |
---|---|---|---|
a | 2021-12-31 23:00:15+00 | 15 | 1 |
b | 2020-12-31 23:00:15+00 | 18 | 1 |
b | 2021-12-31 23:00:15+00 | 16.5 | 2 |
b | 2022-12-31 23:00:15+00 | 14.849999999999998 | 3 |
b | 2023-12-31 23:00:15+00 | 11.294999999999998 | 4 |
c | 2021-12-31 23:00:16+00 | 15 | 1 |
c | 2021-12-31 23:00:17+00 | 55.5 | 2 |
a | 2022-06-01 00:00:15+01 | 55.5 | 2 |
a | 2022-07-01 01:00:15+01 | 89.85 | 3 |
b | 2024-08-01 00:00:15+01 | 61.9065 | 5 |
b | 2024-09-01 00:00:15+01 | 82.33455000000001 | 6 |