apachehadoopbigdataflumeflume-ng

Writing data in hive warehouse directory in two separate tables using flume


I want to write data in hive warehouse directory, in two separate tables called flumemaleemployee and flumefemaleemployee. Last 3 records should be inserted in the female table and upper 3 records should be inserted in male table.Below is my data:

1,alok,mumbai
1,jatin,chennai
1,yogesh,kolkata
2,ragini,delhi
2,jyotsana,pune
1,valmiki,banglore  

Below is my flume conf code:

agent.sources = tailsrc 
agent.channels = mem1 mem2 
agent.sinks = stdl std2 
agent.sources.tailsrc.type = exec 
agent.sources.tailsrc.command = tail -F /home/cloudera/Desktop/in.txt 
agent.sources.tailsrc.batchSize = 1 
agent.sources.tailsrc.interceptors = i1 
agent.sources.tailsrc.interceptors.i1.type = regex_extractor 
agent.sources.tailsrc.interceptors.il.regex = A(\\d} 
agent.sources.tailsrc. interceptors. M.serializers = t1 
agent.sources.tailsrc. interceptors, i1.serializers.t1. name = type 
agent.sources.tailsrc.selector.type = multiplexing 
agent.sources.tailsrc.selector.header = type 
agent.sources.tailsrc.selector.mapping.1 = mem1 
agent.sources.tailsrc.selector.mapping.2 = mem2 
agent.sinks.std1.type = hdfs 
agent.sinks.stdl.channel = mem1 
agent.sinks.stdl.batchSize = 1 
agent.sinks.std1.hdfs.path = /user/hive/warehouse/aisehibanayatp.db/flumemaleemployee
agent.sinks.stdl.rolllnterval = 0 
agent.sinks.stdl.hdfs.fileType = DataStream 
agent.sinks.std2.type = hdfs 
agent.sinks.std2.channel = mem2 
agent.sinks.std2.batchSize = 1 
agent.sinks.std2.hdfs.path = /user/hi ve/warehouse/aisehibanayatp.db/flumefemaleemployee
agent.sinks.std2.rolllnterval = 0 
agent.sinks.std2.hdfs.fileType = DataStream 
agent.channels.mem1.type = memory 
agent.channels.meml.capacity = 100 
agent.channels.mem2.type = memory 
agent.channels.mem2.capacity = 100 
agent.sources.tailsrc.channels = mem1 mem2  

I am not getting any error but when I start flume service with below command it just stucks at something I don't know how to deal with that since I am not getting any error

flume-ng agent --name agent -conf-file /home/cloudera/Desktop/flume1.config  

and it stucks at below step:

18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: CHANNEL, name: mem2. channel.event.take.success == 0
18/11/13 08:03:00 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{std2=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@17ade71c counterGroup:{ name:null counters:{} } }} channels:{mem2=org.apache.flume.channel.MemoryChannel{name: mem2}} }
18/11/13 08:03:00 INFO node.Application: Starting Channel mem2
18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: mem2 started
18/11/13 08:03:00 INFO node.Application: Starting Sink std2
18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: std2: Successfully registered new MBean.
18/11/13 08:03:00 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: std2 started

So how can I achieve this ??


Solution

  • The problem is typos and lack of formatting and spaces and l instead of 1. I managed to fix these and it ran, I altered your regex, you can tailor that, but most of it is an accuracy issue. Use the file as follows and it works, your own HDFS and settings of course:

    agent.sources = tailsrc 
    agent.channels = mem1 mem2 
    agent.sinks = std1 std2 
    agent.sources.tailsrc.type = exec 
    agent.sources.tailsrc.command = tail -F /home/cloudera/in.txt 
    agent.sources.tailsrc.batchSize = 1 
    agent.sources.tailsrc.interceptors = i1 
    agent.sources.tailsrc.interceptors.i1.type = regex_extractor 
    agent.sources.tailsrc.interceptors.i1.regex = ^.*(1|2) 
    agent.sources.tailsrc.interceptors.i1.serializers = t1 
    agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type 
    agent.sources.tailsrc.selector.type = multiplexing 
    agent.sources.tailsrc.selector.header = type 
    agent.sources.tailsrc.selector.mapping.1 = mem1 
    agent.sources.tailsrc.selector.mapping.2 = mem2
    agent.sinks.std1.type = hdfs 
    agent.sinks.std1.channel = mem1 
    agent.sinks.std1.batchSize = 1 
    agent.sinks.std1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/flumemaleemployee
    agent.sinks.std1.rolllnterval = 0 
    agent.sinks.std1.hdfs.fileType = DataStream 
    agent.sinks.std2.type = hdfs 
    agent.sinks.std2.channel = mem2 
    agent.sinks.std2.batchSize = 1 
    agent.sinks.std2.hdfs.path =  hdfs://quickstart.cloudera:8020/user/hive/warehouse/flumefemaleemployee
    agent.sinks.std2.rolllnterval = 0 
    agent.sinks.std2.hdfs.fileType = DataStream 
    agent.channels.mem1.type = memory 
    agent.channels.meml.capacity = 100 
    agent.channels.mem2.type = memory 
    agent.channels.mem2.capacity = 100 
    agent.sources.tailsrc.channels = mem1 mem2