rubyfluentd

How to match IP addresses in a large list of IP network objects in Ruby


For a fluentd environment I'm trying to enrich our logs (they contain IP addresses) with contextual information. For this purpose I've written a fluentd plugin based on Ruby.

I have a file containing subnet information, as well as meta information about each subnet (country and internal site_id for example) in JSON format. The idea now is to write a filter plugin, match the src and dst IP address against the networks in this file and add the appropriate metadata to the record.

The current code looks like this (I've removed some comments and error handling to keep it as short as possible):

require "fluent/plugin/filter"
require "ipaddr"

module Fluent
  module Plugin
    class IpaddressFilter < Fluent::Plugin::Filter
      Fluent::Plugin.register_filter("ipaddress", self)

      config_param :ipaddress_file_path, :string
      config_param :source_address_field, :string, :default => "src_ip"
      config_param :destination_address_field, :string, :default => "dest_ip"

      def initialize
        super
        @parsed_subnets = []
      end

      def configure(conf)
        super
        @ipaddress_file = File.read(@ipaddress_file_path)
        @ipaddress_data = JSON.parse(@ipaddress_file)

        # For each entry, create IP address object
        @ipaddress_data.each do |entry|
          # Create IP address object
          new_entry = {
            "network" => IPAddr.new(entry["network"], family = Socket::AF_INET),
            "country" => entry["country"],
            "site_id" => entry["site_id"]
          }
          # Append hash to array
          @parsed_subnets << new_entry
        end
      end

      def filter(tag, time, record)
        src_ip_obj = IPAddr.new(record[@source_address_field], family = Socket::AF_INET)
        dest_ip_obj = IPAddr.new(record[@destination_address_field], family = Socket::AF_INET)

        src_found = false
        dst_found = false

        # Check if IP addresses are in any of the subnets
        @parsed_subnets.each do |entry|
          # SRC IP
          if entry["network"].include?(src_ip_obj)
            record["src_country"] = entry["country"]
            record["src_site_id"] = entry["site_id"]
            src_found = true
          end
          # DEST IP
          if entry["network"].include?(dest_ip_obj)
            record["dest_country"] = entry["country"]
            record["dest_site_id"] = entry["site_id"]
            dst_found = true
          end

          # Stop loop if both are found
          if src_found & dst_found
            break
          end
        end

        # Return record
        record
      end
    end
  end
end

The code itself works just fine. The subnet list has more than 20.000 entries though, and we're processing more than 2000 log entries per second. The current solution scales with the number of subnet entries in a linear way O(n), which is far from being optimal. The subnets are already summarised (python's netaddr module) to the largest extend possible, without loosing being unique.

Question now is how to improve the speed of this task? I was thinking that a tree-based approach could potentially work. Anything I can do upfront (while the plugin is loading the data) is a one time cost, which would be absolutely preferred compared to doing it for each message.


Solution

  • It is likely that the @parsed_subnets.each block is slowing down your code, because it iterates all subnets only to find two matches.

    Instead of iterating all subnets, I suggest writing all the subnets into a database which supports IP address operators (like PostgreSQL) and then only to query for those two values you are interested in.

    A potential solution could look roughly like this:

    require "fluent/plugin/filter"
    require "ipaddr"
    require "pg"
    
    module Fluent
      module Plugin
        class IpaddressFilter < Fluent::Plugin::Filter
          Fluent::Plugin.register_filter("ipaddress", self)
          
          desc 'Defines source IP field name within the record'
          config_param :source_address_field, :string, :default => "src_ip"
    
          desc 'Defines destination IP field name within the record'
          config_param :destination_address_field, :string, :default => "dest_ip"
    
          config_param :psql_host, :string, :default => "localhost"
          config_param :psql_user, :string, :default => "postgres"
          config_param :psql_pass, :string, :default => "postgres"
          config_param :psql_schema, :string, :default => "fluentd"
          config_param :psql_port, :integer, :default => 5432
    
          def configure(conf)
            super
            @db_conn = PG.connect(
              :host => @psql_host,
              :user => @psql_user,
              :password => @psql_pass,
              :dbname => @psql_schema,
              :port => @psql_port
            )
    
            @db_conn.prepare('get_network', 'SELECT * FROM ipam_networks WHERE network >> $1')
          end
    
          def filter(tag, time, record)
    
            # fetch infos from db - source
            @db_conn.exec_prepared('get_network', [record[@source_address_field]]) do |result|
              result.each do |row|
                record["src_country"] = row["country"]
                record["src_site_id"] = row["site_id"]
              end
            end
    
            # fetch infos from db - destination
            @db_conn.exec_prepared('get_network', [record[@destination_address_field]]) do |result|
              result.each do |row|
                record["dest_country"] = row["country"]
                record["dest_site_id"] = row["site_id"]
              end
            end
    
            record
          end
        end
      end
    end