flumeflume-ng

flume configuration - spoolDir and file roll sink's variable


I've searched and there are multiple posts about how to use the 'fileHeader' and 'basenameHeader' for spoolDirectory, though it seems not to work in my case.

My goal is to use Flume to ingest file(s) from a local directory into a blob directory using file_roll sink and to retain the file name or at least put the file in a directory that has the same name as the file name. For example,

/input/january.txt -> /blob_output/january.[timestamp] or /input/january.txt -> /blob_output/january/[timestamp].txt

The problem is I can't seem to call the variable of the filename or basename as it always returns nothing (or null) resulting in only a timestamp in the output name. Here's my flume configuration:

training_agent.sources = src1
training_agent.channels = ch1
training_agent.sinks = sink1

training_agent.sources = src1
training_agent.channels = ch1
training_agent.sinks = sink1

training_agent.sources.src1.type = spooldir
training_agent.sources.src1.channels = ch1
training_agent.sources.src1.spoolDir = /home/training/input/
training_agent.sources.src1.batchSize = 5
training_agent.sources.src1.fileHeader = true

training_agent.channels.ch1.type = memory
training_agent.channels.capacity = 500
training_agent.channels.ch1.transactionCapacity = 50

training_agent.sinks.sink1.type = file_roll
training_agent.sinks.sink1.sink.rollInterval = 0
training_agent.sinks.sink1.channel = ch1
training_agent.sinks.sink1.sink.directory = /blobtraining_path/destination
training_agent.sinks.sink1.sink.pathManager.prefix = ${file}

Solution

  • In the documentation of Rolling File Sink, there is no option to specify filename of the output file.

    I check the source to find a way to solve this problem but there is no simple way to do it. Flume use only current timestamp to generate a filename. You can only specify prefix and extension for the output file.

    However, you can extend the DefaultPathManager and add an ability for do this or you can create a new custom sink (your own rolling file sink) which has the ability of getting filename header from the flume event headers and use this information to create output file with the same value of the input file.

    To creating a custom sink, you can take a look at Flume Developer Guide's Sink section.

    Extra information:

    training_agent.sinks.sink1.sink.pathManager.prefix = ${file}
    

    You can not directly use an environment variable in flume configuration files. If you want to use an environment variable in your flume configuration you have to add this to your execution command.

    -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties