postgresqlnetflow

Storing ranged timeseries data in Postgres


I need to store netflow data in Postgresql. This is data about network traffic. Each record contains the following:

My question is this: How can I store this data so I can efficiently calculate data transfer rates for the past X days/hours? For example, I may want to draw a chart of all traffic to Netflix's ASN over the last 7 days, with hourly resolution.

The difference between the connection start & end times could be milliseconds, or could be over an hour.


My first-pass at this would be to store the connection in a TSTZRANGE field with a GiST index. Then, to query the data for hourly traffic over the last 7 days:

  1. Use a CTE to generate a sequence of hourly time buckets
  2. Look for any TSTZRANGEs which overlap each bucket
  3. Calculate the duration of the overlap
  4. Calculate the data rate for the record in bytes per second
  5. Do duration * bytes per second to get total data
  6. Group it all on the bucket, summing the total data values

However, that sounds like a lot of heavy lifting. Can anyone think of a better option?


Solution

  • After looking into this some more, I think the real answer is that there isn't an out-of-the-box way to achieve this in a performant manner. Especially as the data volume scales up. Ultimately it is just going to be slow to aggregate many thousands of rows, because that is simply a lot of data access.

    Instead I have gone a different route. I am using a Postgresql trigger on the table which stores the raw flows (traffic_flow). Every time a record is inserted into traffic_flow, the trigger will then upsert the new data into separate aggregation tables for daily, hourly, and minutely data.


    Here is my experimental implementation in case it is useful to someone. This could be improved to also handle updates and deletes.

    create or replace function update_aggregated_traffic(NEW RECORD, table_name TEXT, interval_name text, store_customer BOOLEAN)
        returns void
        language plpgsql
    as
    $body$
    declare
        aggregate_interval interval;
        customer_ip_ inet;
    begin
        -- Update the data aggregated traffic data given the insertion of a new flow.
        -- A flow is the data about a single connection (start time, stop time, total
        -- bytes/packets). This function essentially rasterises that data into a
        -- series of aggregation buckets.
    
        -- interval_name should be second, hour, or minute
        -- turn the interval_name into an actual INTERVAL
        aggregate_interval = ('1 ' || interval_name)::INTERVAL;
        if store_customer then
            customer_ip_ = NEW.source_address;
        else
            customer_ip_ = '100.64.0.0'::INET;
        end if;
    
        -- We need to insert into a dynamically generated table name. There is
        -- no way to do this without writing the whole SQL statement as a string.
        -- Instead, let's use a trick. Create a temporary view, then insert into that.
        -- Postgres will proxy this insert into the desired table
        drop view if exists table_pointer;
        execute format('create temporary view table_pointer as select * from %s', table_name);
    
        -- We use a CTE to keep things readable, even though it is pretty long
        with aggregate_range AS (
            -- Create all the aggregate buckets spanned by the inserted flow
            SELECT generate_series(
                date_trunc(interval_name, lower(NEW.range)),
                date_trunc(interval_name, upper(NEW.range)),
                aggregate_interval
            ) as range_lower
        ),
        -- For each bucket, figure out its overlap with the provided flow data.
        -- Only the first and last buckets will have less than than complete overlap,
        -- but we do the calculation for all buckets anyway
        with_overlaps AS (
            SELECT
                NEW.range * tstzrange(range_lower, range_lower + aggregate_interval) AS overlap,
                range_lower
            FROM
            aggregate_range
        ),
        -- Convert the overlap intervals into seconds (FLOAT)
        with_overlap_seconds AS (
            SELECT
                extract(epoch from (upper(overlap) - lower(overlap))) as overlap_seconds,
                range_lower
            FROM
                with_overlaps
        )
        -- Now we have enough information to do the inserts
        insert into table_pointer as traffic
            (timestamp, customer_ip, as_number, bytes, packets)
            select
                range_lower,
                customer_ip_,
                NEW.as_number,
                -- Scale the packets/bytes per second to be a total number of
                -- of packets/bytes
                round(NEW.bytes_per_second * overlap_seconds)::INT,
                round(NEW.packets_per_second * overlap_seconds)::INT
            from with_overlap_seconds
            -- We shouldn't have any 0-second overlaps, but let's just be sure
            where overlap_seconds > 0
            -- If there is already existing data, then increment the bytes/packets values
            on conflict (customer_ip, timestamp, as_number) DO UPDATE SET
                bytes = EXCLUDED.bytes + traffic.bytes,
                packets = EXCLUDED.packets + traffic.packets
        ;
    end;
    $body$;
    
    
    create or replace function update_aggregated_traffic_hourly() returns trigger
        language plpgsql
    as
    $body$
    begin
        -- Store aggregated data for different resolutions. For each we also store data
        -- without the customer information. This way we can efficiently see traffic data
        -- for the whole network
        PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', True);
        PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', False);
    
        PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', True);
        PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', False);
    
        PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', True);
        PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', False);
    
        PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', True);
        PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', False);
    
        return NEW;
    end;
    $body$;
    
    create trigger update_aggregated_traffic_hourly_trigger AFTER INSERT ON traffic_flow
        FOR EACH ROW EXECUTE PROCEDURE update_aggregated_traffic_hourly();