I am using the HTTPSource
in Flume
for receiving POST events in json
format as follows:
{"username":"xyz","password":"123"}
My question is: Do I have to modify the source of the events (I mean the one that is sending the JSON
to the Flume) so the JSON, has the following format:
[{
"headers" : {
"timestamp" : "434324343",
"host" : "random_host.example.com"
},
"body" : "{"username":"xyz","password":"123"}"
}]
This is the best way to do it? Or I can modify it everywhere else?
My conf
file for the flume agent
is:
## Componentes
SomeAgent.sources = SomeHTTP
SomeAgent.channels = MemChannel
SomeAgent.sinks = SomeHDFS
## Fuente e Interceptores
SomeAgent.sources.SomeHTTP.type = http
SomeAgent.sources.SomeHTTP.port = 5140
SomeAgent.sources.SomeHTTP.handler = org.apache.flume.source.http.JSONHandler
SomeAgent.sources.SomeHTTP.channels = MemChannel
SomeAgent.sources.SomeHTTP.interceptors = i1 i2
## Interceptores
SomeAgent.sources.SomeHTTP.interceptors.i1.type = timestamp
SomeAgent.sources.SomeHTTP.interceptors.i2.type = host
SomeAgent.sources.SomeHTTP.interceptors.i2.hostHeader = hostname
## Canal
SomeAgent.channels.MemChannel.type = memory
SomeAgent.channels.MemChannel.capacity = 10000
SomeAgent.channels.MemChannel.transactionCapacity = 1000
## Sumidero
SomeAgent.sinks.SomeHDFS.type = hdfs
SomeAgent.sinks.SomeHDFS.channel = MemChannel
SomeAgent.sinks.SomeHDFS.hdfs.path = /raw/logs/%Y-%m-%d
SomeAgent.sinks.SomeHDFS.hdfs.fileType = DataStream
SomeAgent.sinks.SomeHDFS.hdfs.filePrefix = SomeLogs-
SomeAgent.sinks.SomeHDFS.hdfs.writeFormat = Text
SomeAgent.sinks.SomeHDFS.hdfs.batchSize = 100
SomeAgent.sinks.SomeHDFS.hdfs.rollSize = 0
SomeAgent.sinks.SomeHDFS.hdfs.rollCount = 10000
SomeAgent.sinks.SomeHDFS.hdfs.rollInterval = 600
SomeAgent.sinks.SomeHDFS.hdfs.useLocalTimeStamp = true
Running the cat
of hadoop fs
$ hadoop fs -ls -R /raw/logs/somes
drwxr-xr-x - flume-agent supergroup 0 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16
-rw-r--r-- 3 flume-agent supergroup 3814 2015-06-16 12:33 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434471803369
-rw-r--r-- 3 flume-agent supergroup 3719 2015-06-16 12:43 /raw/logs/arquimedes/2015-06-16/SomeLogs.1434472404774
$ hadoop fs -cat /raw/logs/somes/2015-06-16/SomeLogs.1434471803369 | head
$
(you look correctly, empty lines)
If now I look at the file (using the binary view of HUE
for example):
0000000: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
0000010: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
0000020: 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a 0a ................
If I've understood well, you want to serialize both the data and the headers. In that case, you do not have to modify the data source, but use some standard Flume elements and create your custom serializer for HDFS.
The first step is to achieve Flume creates the desired JSON structure, i.e. headers+body. Flume is able to do it for you, just use JSONHandler at your HTTPSource, this way:
a1.sources = r1
a1.sources.r1.hnadler = org.apache.flume.source.http.JSONHandler
In fact, it is not necessary to configure the JSON handler since it is the default one for HTTPSource.
Then, use both Timestamp Interceptor and Host Interceptor in order to add the desired headers. The only trick is the Flume agent must run in the same machine than the sender process in order the intercepted host is the same than the sender one:
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader = hostname
At this point, you will have the desired event. Nevertheless, standard serializers for HDFS only save the body, not the headers. Thus create a custom serializer that implements org.apache.flume.serialization.EventSerializer
. It is configured as:
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.serializer = my_custom_serializer
HTH