postgresqlstored-proceduresuser-defined-functionstimescaledbretention

How do I create a scheduled user-defined action that trims rows in TimescaleDB?


I'm monitoring, collecting and storing event data for different organizations in my TimescaleDB.

I have a TimescaleDB with one database. Each organization has its own schema in that database. Each schema has two tables: A device table and an event table.

The device table stores information about the devices of the given organization, while the event table is a Time Series table that stores all the events collected from the different devices. The event table has a column (foreign key) that points to the device table.

This is how my database is structured

β–Ό Servers (1)
  β–Ό TimescaleDB Cloud
    β–Ό πŸ›’ Databases (1)
      β–Ό πŸ›’ myTimescaleDB
        β–Ό Schemas (12)
          β–Ό organization_1 (12)
            β–Ό Tables (2)
              β–Ό device
                β–Ό Columns (3)
                    device_id
                    device_name
                    device_type
              β–Ό event
                β–Ό Columns (5)
                    event_id
                    time
                    device_id (fk)
                    event_source
                    event_type
          β–Ό organization_2 (12)
            β–Ό Tables (2)
              β–Ό device
                β–Ό Columns (3)
                    device_id
                    device_name
                    device_type
              β–Ό event
                β–Ό Columns (5)
                    event_id
                    time
                    device_id (fk)
                    event_source
                    event_type

I want to create a scheduled user-defined action that basically makes sure that each device only has a maximum of 10.000 events. So every - let's say 5 minutes - the user-defined action should count the number of events of each device in each schema. Any device that has more than 10.000 events should be trimmed to only have 10.000. We always want to trim the old events though. How do I do this?


Solution

  • Is your device sending the data every X seconds? Because you could also create a retention policy if you have such a case. Let's say you have it receiving a new event every 5 seconds for each device, so you have 12 per minute and (12 * 60 * 24) = 17280 per day. So, 10k of 17280 is around 58% of the day.

    You can also do some math to get a number of hours and use a retention policy to remove the rest.

    That would be the adoption of retention policies that in the end are background jobs doing exactly the removal. The only difference is that they're much more efficient because they drop the chunks completely. If that's not the case, you need to use a custom background job as you requested help here.

    To build the proper scenario, you can try to build a query to validate what you want. Here is a snippet using a window function to get the proper record:

    WITH summary AS (
        SELECT time,
               device,
               ROW_NUMBER() OVER(PARTITION BY device
                                     ORDER BY time DESC ) AS rank
          FROM conditions )
     SELECT *
       FROM summary
     WHERE rank = 10000;
    

    It would return something like:

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”
    β”‚          time          β”‚ device β”‚ rank  β”‚
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€
    β”‚ 2000-01-07 21:17:05+00 β”‚      0 β”‚ 10000 β”‚
    β”‚ 2000-01-04 10:56:19+00 β”‚      1 β”‚ 10000 β”‚
    β”‚ 2000-01-04 11:37:45+00 β”‚      2 β”‚ 10000 β”‚
    β”‚ 2000-01-04 11:53:32+00 β”‚      3 β”‚ 10000 β”‚
    β”‚ 2000-01-04 11:42:57+00 β”‚      4 β”‚ 10000 β”‚
    β”‚ 2000-01-04 10:13:28+00 β”‚      5 β”‚ 10000 β”‚
    β”‚ 2000-01-04 11:30:52+00 β”‚      6 β”‚ 10000 β”‚
    β”‚ 2000-01-04 11:38:55+00 β”‚      7 β”‚ 10000 β”‚
    β”‚ 2000-01-04 11:46:30+00 β”‚      8 β”‚ 10000 β”‚
    

    Now it's just a matter to combine the query with the delete clause:

    WITH summary AS (
        SELECT time,
               device,
               ROW_NUMBER() OVER(PARTITION BY device
                                     ORDER BY time DESC ) AS rank
          FROM conditions )
     DELETE FROM conditions USING summary
       WHERE summary.rank = 10000 and conditions.time < summary.time and summary.device = conditions.device;
    

    Confirming it worked as you expected:

    select device, count(1) from conditions group by 1;
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ device β”‚ count β”‚
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€
    β”‚      0 β”‚ 10000 β”‚
    β”‚      1 β”‚ 10000 β”‚
    β”‚      2 β”‚ 10000 β”‚
    β”‚      3 β”‚ 10000 β”‚
    β”‚      4 β”‚ 10000 β”‚
    β”‚      5 β”‚ 10000 β”‚
    β”‚      6 β”‚ 10000 β”‚
    

    And you can wrap it into the background function:

    
    CREATE OR REPLACE PROCEDURE limit_devices_data(job_id int, config jsonb) LANGUAGE PLPGSQL AS
    $$
    BEGIN
      RAISE NOTICE 'DELETING in the job % with config %', job_id, config;
       WITH summary AS (
        SELECT time,
               device,
               ROW_NUMBER() OVER(PARTITION BY device
                                     ORDER BY time DESC ) AS rank
          FROM conditions )
     DELETE FROM conditions USING summary
       WHERE summary.rank = 10000 and conditions.time < summary.time and summary.device = conditions.device;
      COMMIT;
    END
    $$;
    

    And add the job to run every 5 minutes or depending on your need:

    SELECT add_job('limit_devices_data','5 minutes', initial_start => now() + INTERVAL '5 seconds');
    

    If you have too much data, maybe you'll need to increase the max_runtime:

    SELECT alter_job(job_id, max_runtime =>  INTERVAL '1 minute');
    

    As you have several tables under different schemas, I'd recommend you take a look at execute combined with format to iterate over several tables. You can query the hypertable names using timescaledb_information.hypertables view.