logstashlogstash-file

logstash create new file every 30seconds


I have following filter configuration in my logstatsh pipeline. What it does is, at the start of the event first filter creates a CSV file with header and sets the file name to metadata. Second filter writes the output to above CSV.

The challenge (or) requirement I have is:Every Xseconds, we need to create new CSV file and write to that file. I am not ruby expert and couldn't get any clues from Google search. Can someone please advise?

  filter {
    ruby {
       init => "
            begin
                randval = (0...8).map { (65 + rand(26)).chr }.join      
                @csv_file = 'output'+randval+'.csv'
                csv_headers = ['YYYY-MM-ddTHH:mm:ss.SSSZ','Log Level','Event ID']
                if File.zero?(@csv_file) || !File.exist?(@csv_file)
                    CSV.open(@csv_file, 'w') do |csv|
                        csv << csv_headers
                    end
                    
                end
            end
        "
        code => '
                event.set("[@metadata][suffix]",@csv_file)
        '
    }
}


output {
   file {
      path => "output.log"
   }    
   csv {
       fields => [ "created", "level", "code"]
       path => "%{[@metadata][suffix]}"
    }
}

Solution

  • Funnily enough, someone asked exactly this question at discuss.elastic.co yesterday, complete with the same unnecessary init option, so I happen to know that the answer is

    ruby {
        code => '
                event.set("[@metadata][suffix]", 'output' + (Time.now.to_i / 30).to_s + '.csv')
        '
    }
    

    This will result in the file output writing to a different file for any events that arrive in a 30 second interval.

    I am not aware of any way to add a header just for the first event written to the file by an output. A csv output can add headers for every row. A csv codec can write headers once, but would not write them again when the file name changed.

    That said, if you are just writing to a file you could do the write in the ruby filter and keep track of whether a header has been written for the current value of Time.now.to_i / 30. You could do that using something similar to this. I re-purposed some of the code from the csv output.

    input { heartbeat { interval => 5 message => '{ "foo": 1, "bar": 2, "baz": 3 }' } }
    filter {
        json { source => "message" target => "data" remove_field => [ "message" ] }
        ruby {
            init => '
                @fields = [ "[data][bar]", "[data][baz]" ]
                @csv_options = Hash.new
                @spreadsheet_safe = true
            '
            code => '
                def event_to_csv(event)
                    csv_values = @fields.map {|name| get_value(name, event)}
                    csv_values.to_csv(@csv_options)
                end
    
                def get_value(name, event)
                    val = event.get(name)
                    val.is_a?(Hash) ? LogStash::Json.dump(val) : escape_csv(val)
                end
    
                def escape_csv(val)
                    (@spreadsheet_safe && val.is_a?(String) && val.start_with?("=")) ? "\'#{val}" : val
                end
    
                id = Time.now.to_i / 20
                file = "output" + id.to_s + ".csv"
    
                fd = open("/tmp/#{file}", "a")
                if id != @last_id
                    chunk = "bar,baz\n"
                    fd.write(chunk)
                end
    
                chunk = event_to_csv(event)
                fd.write(chunk)
                fd.close
    
                @last_id = id
                event.cancel
            '
        }
    }