hadoophdfsflumeflume-ng

Apache Flume agent does not save the data in HDFS


I am trying to create an agent with Apache Flume, but I am new to this and I have not much idea. The agent has to receive the data from Netcat and save it in an HDFS file system. The data that the agent will receive will be, for example, these:

1, E1, Eneko, Donostia

1, E2, Ane, Bilbo

2, E3, Julen, Baiona

2, E4, Jack, London

In the netcat, I can write the rows one by one, that is not a problem. But if the row begins by number 1, that row must be saved in the directory called manager (located in HDFS) and if not, in other directory called developer (located in HDFS).

I have done the following configuration file and the agent starts correctly. I can also send data from netcat and it seems that the agent listens correctly, since OK returns. But no row sent by netcat reaches HDFS, the directories (manager and developer) that I have created are always empty.

I have created the directories in the HDFS root, with the following command: hadoop fs -mkdir ../../ <directory_name>

In the log file (/var/log/flume-ng/flume.log) no error appears.

Please help me. I've been checking many things and I don't know what else I can do.

Here you have the Apache Flume configuration file:

a1.sources=r1
a1.channels=c1 c2
a1.sinks = k1 k2

a1.sources.r1.type=netcat
a1.sources.r1.bind=localhost
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100

a1.sources.r1.interceptors.i1.type=regex_extractor
a1.sources.r1.interceptors.i1.regex= ^(\\d)
a1.sources.r1.interceptors.i1.serializers=s1
a1.sources.r1.interceptors.i1.serializers.s1.name=Rola

a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=Rola
a1.sources.r1.selector.mapping.1=c1
a1.sources.r1.selector.mapping.2=c2

a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://localhost:8020/manager
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.fileStream=DataStream

a1.sinks.k2.type=hdfs
a1.sinks.k2.hdfs.path=hdfs://localhost:8020/developer
a1.sinks.k2.hdfs.writeFormat=Text
a1.sinks.k2.hdfs.fileStream=DataStream

a1.sources.r1.channels = c2 c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Solution

  • The problem was that the interceptor was not defined. Once defined everything works correctly.

    a1.sources.r1.interceptors = i1
    

    The interceptor must be defined before the block that uses the interceptor.