In my flume flow, I want to have a custom dynamic hdfs path but no data is being populated to the interceptors.
Example data: 188 17 2016-06-01 00:31:10 6200.041736 0
Config
agent2.sources.source2.interceptors = i2 i3 i4
agent2.sources.source2.interceptors.i2.type = regex_extractor
agent2.sources.source2.interceptors.i3.type = regex_extractor
agent2.sources.source2.interceptors.i4.type = regex_extractor
# regex to pick up the year
agent2.sources.source2.interceptors.i2.regex = (?<=\t)[0-9]{4}(?=-)
agent2.sources.source2.interceptors.i2.serializers = y
agent2.sources.source2.interceptors.i2.serializers.y.name = year
# regex to pick up the month
agent2.sources.source2.interceptors.i3.regex = (?<=-)[0-9]{2}(?=-)
agent2.sources.source2.interceptors.i3.serializers = m
agent2.sources.source2.interceptors.i3.serializers.m.name = month
# regex to pick up the day
agent2.sources.source2.interceptors.i4.regex = (?<=-)[0-9]{2}(?=\t)
agent2.sources.source2.interceptors.i4.serializers = d
agent2.sources.source2.interceptors.i4.serializers.d.name = day
# Define the HDFS sink 2 –year and month
agent2.sinks.sink-hdfs2.type = hdfs
agent2.sinks.sink-hdfs2.hdfs.path = /group-project/consumption/%{year}/%{month}
agent2.sinks.sink-hdfs2.hdfs.filePrefix = %{year}-%{month}
agent2.sinks.sink-hdfs2.hdfs.fileSuffix = .txt
The look-aheads and look-behinds for year and day will only match the tab character. They will not match multiple whitespaces. You'd be better off using \\s
.
Also Flume requires two backslashes for regex symbols, \\t
rather than \t
.
Alternatively, you could use one regex to grab the whole date and with multiple capture groups assign them to different serializers. For example, (\\d{4})-(\\d{2})-(\\d{2})
The Flume User Guide has a good example:
If the Flume event body contained
1:2:3.4foobar5
and the following configuration was used
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
The extracted event will contain the same body but the following headers will have been added
one=>1, two=>2, three=>3