so as the header states I have a flume agent with kafka source and it writes to an HDFS location, compressed as avro and I want to multiplex it to write the events in a log file as well. I'm running my flume in a pod inside AKS.
So this is what I have tried so far, this part of my flume configuration:
flumeagent.sources = kafkaSource
flumeagent.sources.kafkaSource.channels = kafkaChannel logChannel
flumeagent.sources.kafkaSource.selector.type = multiplexing
flumeagent.sources.kafkaSource.selector.default = kafkaChannel logChannel
flumeagent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
flumeagent.sources.kafkaSource.kafka.consumer.security.protocol = SSL
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.sources.kafkaSource.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.sources.kafkaSource.batchSize = 5000
flumeagent.sources.kafkaSource.topics = <TOPICS>
flumeagent.sources.kafkaSource.consumer.group.id = <GROUP_ID>
flumeagent.channels = kafkaChannel logChannel
flumeagent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel
flumeagent.channels.kafkaChannel.kafka.producer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.security.protocol = SSL
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.consumer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.truststore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.location = <LOCATION>
flumeagent.channels.kafkaChannel.kafka.producer.ssl.keystore.password = <PASSWORD>
flumeagent.channels.kafkaChannel.keep-alive = 120
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.heartbeat.interval.ms = 10000
flumeagent.channels.kafkaChannel.kafka.producer.buffer.memory = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.request.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 30000
flumeagent.channels.kafkaChannel.kafka.consumer.session.timeout.ms = 40000
flumeagent.channels.kafkaChannel.kafka.producer.max.request.size = 50000000
flumeagent.channels.kafkaChannel.kafka.consumer.max.partition.fetch.bytes = 50000000
flumeagent.channels.kafkaChannel.transactionCapacity = 1000
flumeagent.channels.kafkaChannel.capacity = 50000
flumeagent.channels.kafkaChannel.kafka.topic = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.consumer.group.id = <TOPIC_NAME>
flumeagent.channels.kafkaChannel.kafka.bootstrap.servers = <SERVERS>
flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10
flumeagent.sinks = hdfsSink logSink
flumeagent.sinks.hdfsSink.channel = kafkaChannel
flumeagent.sinks.hdfsSink.type = hdfs
flumeagent.sinks.hdfsSink.hdfs.fileType = DataStream
flumeagent.sinks.hdfsSink.serializer = avro_event
flumeagent.sinks.hdfsSink.serializer.compressionCodec = snappy
flumeagent.sinks.hdfsSink.hdfs.fileSuffix = .avro
flumeagent.sinks.hdfsSink.hdfs.batchSize = 10
flumeagent.sinks.hdfsSink.hdfs.rollSize = 0
flumeagent.sinks.hdfsSink.hdfs.rollCount = 0
flumeagent.sinks.hdfsSink.hdfs.callTimeout = 60000
flumeagent.sinks.hdfsSink.hdfs.cleanPreviousTemps = true
flumeagent.sinks.hdfsSink.hdfs.inUsePrefix = .
flumeagent.sinks.hdfsSink.hdfs.rollInterval = 3600
flumeagent.sinks.hdfsSink.hdfs.maxPathCountToScan = 2
flumeagent.sinks.hdfsSink.hdfs.timeZone = <TIME_ZONE>
flumeagent.sinks.hdfsSink.hdfs.path =<HDFS PATH>
flumeagent.sinks.logsink.channel = logChannel
flumeagent.sinks.logsink.type = file_roll
flumeagent.sinks.logsink.sink.batchSize = 1
flumeagent.sinks.logsink.sink.directory = /var/log
flumeagent.sinks.logsink.sink.rollInterval = 0
flumeagent.sinks.logsink.sink.pathManager.prefix = monitor
flumeagent.sinks.logsink.sink.pathManager.extension = txt
To deploy this to AKS, I create a config map using
kubectl create configmap <name>.properties --from-file = flume.conf = <agent-name>.conf
To apply I use
kubectl apply -f flume.yml
What I have got is, the flume writes to the HDFS location successfully and the file_roll sink creates the file in /var/log but it writes no data to the file.
What worked is changing memory channel with jdbc channel.
Replace this
flumeagent.channels.logChannel.type = memory
flumeagent.channels.logChannel.capacity = 5000
flumeagent.channels.logChannel.transactionCapacity = 10
With
flumeagent.channels.logChannel.type = jdbc