For a fluentd environment I'm trying to enrich our logs (they contain IP addresses) with contextual information. For this purpose I've written a fluentd plugin based on Ruby.
I have a file containing subnet information, as well as meta information about each subnet (country and internal site_id for example) in JSON format. The idea now is to write a filter plugin, match the src and dst IP address against the networks in this file and add the appropriate metadata to the record.
The current code looks like this (I've removed some comments and error handling to keep it as short as possible):
require "fluent/plugin/filter"
require "ipaddr"
module Fluent
module Plugin
class IpaddressFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter("ipaddress", self)
config_param :ipaddress_file_path, :string
config_param :source_address_field, :string, :default => "src_ip"
config_param :destination_address_field, :string, :default => "dest_ip"
def initialize
super
@parsed_subnets = []
end
def configure(conf)
super
@ipaddress_file = File.read(@ipaddress_file_path)
@ipaddress_data = JSON.parse(@ipaddress_file)
# For each entry, create IP address object
@ipaddress_data.each do |entry|
# Create IP address object
new_entry = {
"network" => IPAddr.new(entry["network"], family = Socket::AF_INET),
"country" => entry["country"],
"site_id" => entry["site_id"]
}
# Append hash to array
@parsed_subnets << new_entry
end
end
def filter(tag, time, record)
src_ip_obj = IPAddr.new(record[@source_address_field], family = Socket::AF_INET)
dest_ip_obj = IPAddr.new(record[@destination_address_field], family = Socket::AF_INET)
src_found = false
dst_found = false
# Check if IP addresses are in any of the subnets
@parsed_subnets.each do |entry|
# SRC IP
if entry["network"].include?(src_ip_obj)
record["src_country"] = entry["country"]
record["src_site_id"] = entry["site_id"]
src_found = true
end
# DEST IP
if entry["network"].include?(dest_ip_obj)
record["dest_country"] = entry["country"]
record["dest_site_id"] = entry["site_id"]
dst_found = true
end
# Stop loop if both are found
if src_found & dst_found
break
end
end
# Return record
record
end
end
end
end
The code itself works just fine. The subnet list has more than 20.000 entries though, and we're processing more than 2000 log entries per second. The current solution scales with the number of subnet entries in a linear way O(n)
, which is far from being optimal. The subnets are already summarised (python's netaddr
module) to the largest extend possible, without loosing being unique.
Question now is how to improve the speed of this task? I was thinking that a tree-based approach could potentially work. Anything I can do upfront (while the plugin is loading the data) is a one time cost, which would be absolutely preferred compared to doing it for each message.
It is likely that the @parsed_subnets.each
block is slowing down your code, because it iterates all subnets only to find two matches.
Instead of iterating all subnets, I suggest writing all the subnets into a database which supports IP address operators (like PostgreSQL) and then only to query for those two values you are interested in.
A potential solution could look roughly like this:
require "fluent/plugin/filter"
require "ipaddr"
require "pg"
module Fluent
module Plugin
class IpaddressFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter("ipaddress", self)
desc 'Defines source IP field name within the record'
config_param :source_address_field, :string, :default => "src_ip"
desc 'Defines destination IP field name within the record'
config_param :destination_address_field, :string, :default => "dest_ip"
config_param :psql_host, :string, :default => "localhost"
config_param :psql_user, :string, :default => "postgres"
config_param :psql_pass, :string, :default => "postgres"
config_param :psql_schema, :string, :default => "fluentd"
config_param :psql_port, :integer, :default => 5432
def configure(conf)
super
@db_conn = PG.connect(
:host => @psql_host,
:user => @psql_user,
:password => @psql_pass,
:dbname => @psql_schema,
:port => @psql_port
)
@db_conn.prepare('get_network', 'SELECT * FROM ipam_networks WHERE network >> $1')
end
def filter(tag, time, record)
# fetch infos from db - source
@db_conn.exec_prepared('get_network', [record[@source_address_field]]) do |result|
result.each do |row|
record["src_country"] = row["country"]
record["src_site_id"] = row["site_id"]
end
end
# fetch infos from db - destination
@db_conn.exec_prepared('get_network', [record[@destination_address_field]]) do |result|
result.each do |row|
record["dest_country"] = row["country"]
record["dest_site_id"] = row["site_id"]
end
end
record
end
end
end
end