I need to store netflow data in Postgresql. This is data about network traffic. Each record contains the following:
My question is this: How can I store this data so I can efficiently calculate data transfer rates for the past X days/hours? For example, I may want to draw a chart of all traffic to Netflix's ASN over the last 7 days, with hourly resolution.
The difference between the connection start & end times could be milliseconds, or could be over an hour.
My first-pass at this would be to store the connection in a TSTZRANGE field with a GiST index. Then, to query the data for hourly traffic over the last 7 days:
However, that sounds like a lot of heavy lifting. Can anyone think of a better option?
After looking into this some more, I think the real answer is that there isn't an out-of-the-box way to achieve this in a performant manner. Especially as the data volume scales up. Ultimately it is just going to be slow to aggregate many thousands of rows, because that is simply a lot of data access.
Instead I have gone a different route. I am using a Postgresql trigger on the table which stores the raw flows (traffic_flow
). Every time a record is inserted into traffic_flow
, the trigger will then upsert the new data into separate aggregation tables for daily, hourly, and minutely data.
Here is my experimental implementation in case it is useful to someone. This could be improved to also handle updates and deletes.
create or replace function update_aggregated_traffic(NEW RECORD, table_name TEXT, interval_name text, store_customer BOOLEAN)
returns void
language plpgsql
as
$body$
declare
aggregate_interval interval;
customer_ip_ inet;
begin
-- Update the data aggregated traffic data given the insertion of a new flow.
-- A flow is the data about a single connection (start time, stop time, total
-- bytes/packets). This function essentially rasterises that data into a
-- series of aggregation buckets.
-- interval_name should be second, hour, or minute
-- turn the interval_name into an actual INTERVAL
aggregate_interval = ('1 ' || interval_name)::INTERVAL;
if store_customer then
customer_ip_ = NEW.source_address;
else
customer_ip_ = '100.64.0.0'::INET;
end if;
-- We need to insert into a dynamically generated table name. There is
-- no way to do this without writing the whole SQL statement as a string.
-- Instead, let's use a trick. Create a temporary view, then insert into that.
-- Postgres will proxy this insert into the desired table
drop view if exists table_pointer;
execute format('create temporary view table_pointer as select * from %s', table_name);
-- We use a CTE to keep things readable, even though it is pretty long
with aggregate_range AS (
-- Create all the aggregate buckets spanned by the inserted flow
SELECT generate_series(
date_trunc(interval_name, lower(NEW.range)),
date_trunc(interval_name, upper(NEW.range)),
aggregate_interval
) as range_lower
),
-- For each bucket, figure out its overlap with the provided flow data.
-- Only the first and last buckets will have less than than complete overlap,
-- but we do the calculation for all buckets anyway
with_overlaps AS (
SELECT
NEW.range * tstzrange(range_lower, range_lower + aggregate_interval) AS overlap,
range_lower
FROM
aggregate_range
),
-- Convert the overlap intervals into seconds (FLOAT)
with_overlap_seconds AS (
SELECT
extract(epoch from (upper(overlap) - lower(overlap))) as overlap_seconds,
range_lower
FROM
with_overlaps
)
-- Now we have enough information to do the inserts
insert into table_pointer as traffic
(timestamp, customer_ip, as_number, bytes, packets)
select
range_lower,
customer_ip_,
NEW.as_number,
-- Scale the packets/bytes per second to be a total number of
-- of packets/bytes
round(NEW.bytes_per_second * overlap_seconds)::INT,
round(NEW.packets_per_second * overlap_seconds)::INT
from with_overlap_seconds
-- We shouldn't have any 0-second overlaps, but let's just be sure
where overlap_seconds > 0
-- If there is already existing data, then increment the bytes/packets values
on conflict (customer_ip, timestamp, as_number) DO UPDATE SET
bytes = EXCLUDED.bytes + traffic.bytes,
packets = EXCLUDED.packets + traffic.packets
;
end;
$body$;
create or replace function update_aggregated_traffic_hourly() returns trigger
language plpgsql
as
$body$
begin
-- Store aggregated data for different resolutions. For each we also store data
-- without the customer information. This way we can efficiently see traffic data
-- for the whole network
PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perdaytraffic','day', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perhourtraffic','hour', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_perminutetraffic','minute', False);
PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', True);
PERFORM update_aggregated_traffic(NEW, 'traffic_persecondtraffic','second', False);
return NEW;
end;
$body$;
create trigger update_aggregated_traffic_hourly_trigger AFTER INSERT ON traffic_flow
FOR EACH ROW EXECUTE PROCEDURE update_aggregated_traffic_hourly();