kubectlazure-aksflumeflume-ng

Flume with Kafka Source not writing events using file_roll


so as the header states I have a flume agent with kafka source and it writes to an HDFS location, compressed as avro and I want to multiplex it to write the events in a log file as well. I'm running my flume in a pod inside AKS.

So this is what I have tried so far, this part of my flume configuration:

flumeagent.sources = kafkaSource
flumeagent.sources.kafkaSource.channels = kafkaChannel logChannel
flumeagent.sources.kafkaSource.selector.type = multiplexing
flumeagent.sources.kafkaSource.selector.default = kafkaChannel logChannel
flumeagent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
flumeagent.sources.kafkaSource.kafka.consumer.security.protocol = SSL
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.sources.kafkaSource.batchSize  = 5000
flumeagent.sources.kafkaSource.topics  = <TOPICS>
flumeagent.sources.kafkaSource.consumer.group.id  = <GROUP_ID>

flumeagent.channels = kafkaChannel logChannel
flumeagent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
flumeagent.channels.kafkaChannel.kafka.producer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.keep-alive = 120
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.heartbeat.interval.ms = 10000
flumeagent.channels.kafkaChannel.kafka.producer.buffer.memory = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.request.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.max.partition.fetch.bytes = 50000000
flumeagent.channels.kafkaChannel.transactionCapacity = 1000
flumeagent.channels.kafkaChannel.capacity = 50000
flumeagent.channels.kafkaChannel.kafka.topic = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.consumer.group.id = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.bootstrap.servers = <SERVERS>

flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10

flumeagent.sinks = hdfsSink logSink
flumeagent.sinks.hdfsSink.channel = kafkaChannel
flumeagent.sinks.hdfsSink.type = hdfs
flumeagent.sinks.hdfsSink.hdfs.fileType = DataStream
flumeagent.sinks.hdfsSink.serializer = avro_event
flumeagent.sinks.hdfsSink.serializer.compressionCodec = snappy
flumeagent.sinks.hdfsSink.hdfs.fileSuffix = .avro
flumeagent.sinks.hdfsSink.hdfs.batchSize = 10
flumeagent.sinks.hdfsSink.hdfs.rollSize = 0
flumeagent.sinks.hdfsSink.hdfs.rollCount = 0
flumeagent.sinks.hdfsSink.hdfs.callTimeout = 60000 
flumeagent.sinks.hdfsSink.hdfs.cleanPreviousTemps = true
flumeagent.sinks.hdfsSink.hdfs.inUsePrefix = .
flumeagent.sinks.hdfsSink.hdfs.rollInterval = 3600
flumeagent.sinks.hdfsSink.hdfs.maxPathCountToScan = 2
flumeagent.sinks.hdfsSink.hdfs.timeZone = <TIME_ZONE>
flumeagent.sinks.hdfsSink.hdfs.path =<HDFS PATH>

flumeagent.sinks.logsink.channel = logChannel
flumeagent.sinks.logsink.type = file_roll
flumeagent.sinks.logsink.sink.batchSize = 1 
flumeagent.sinks.logsink.sink.directory = /var/log
flumeagent.sinks.logsink.sink.rollInterval = 0
flumeagent.sinks.logsink.sink.pathManager.prefix = monitor
flumeagent.sinks.logsink.sink.pathManager.extension = txt

To deploy this to AKS, I create a config map using

kubectl create configmap <name>.properties --from-file = flume.conf = <agent-name>.conf

To apply I use

kubectl apply -f flume.yml

What I have got is, the flume writes to the HDFS location successfully and the file_roll sink creates the file in /var/log but it writes no data to the file.


Solution

  • What worked is changing memory channel with jdbc channel.

    Replace this

    flumeagent.channels.logChannel.type = memory
    flumeagent.channels.logChannel.capacity = 5000
    flumeagent.channels.logChannel.transactionCapacity = 10
    

    With

    flumeagent.channels.logChannel.type = jdbc