Using org.apache.flume.agent.embedded.EmbeddedAgent
.
Configuration as such:
Map<String, String> configurationProperties = ...;
service.configure(configurationProperties);
Where configurationProperties
is set with:
{
"kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000",
"processor.type": "load_balance",
"sinks": "kafkaSink1",
"channel.keep-alive": "0",
"channel.checkpointDir": "********************************",
"kafkaSink.kafka.producer.reconnect.backoff.ms": "2000",
"channel.dataDirs": "********************************",
"kafkaSink.kafka.producer.retry.backoff.ms": "1000",
"processor.selector.maxTimeOut": "60000",
"kafkaSink.kafka.producer.max.request.size": "5485760",
"kafkaSink1.flumeBatchSize": "1000",
"kafkaSink.kafka.producer.buffer.memory": "67108864",
"kafkaSink.kafka.producer.client.id": "********************************",
"kafkaSink1.useFlumeEventFormat": "true",
"kafkaSink1.kafka.topic": "********************************",
"kafkaSink.kafka.producer.batch.size": "8196",
"channel.kafka.dataDirs": "********************************",
"kafkaSink1.type": "KAFKA",
"channel.backupCheckpointDir": "********************************",
"kafkaSink1.allowTopicOverride": "true",
"channel.useDualCheckpoints": "true",
"kafkaSink.kafka.producer.compression.type": "lz4",
"processor.maxBackoff": "60000",
"use_dual_channel": "true",
"channel.capacity": "1000000",
"channel.byteCapacityBufferPercentage": "50",
"channel.transactionCapacity": "1000",
"channel.byteCapacity": "10485760",
"channel.type": "file",
"processor.backoff": "true",
"channel.kafka.checkpointDir": "********************************",
"channel.kafka.backupCheckpointDir": "********************************",
"kafkaSink1.kafka.bootstrap.servers": "********************************",
"kafkaSink.kafka.producer.acks": "-1"
}
At runtime it throw the following:
java.lang.NullPointerException
at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
at ******************.startService(******************)
And flume will not start whatsoever.
The code for org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52
shows it is looking for property sinks
which is not empty so it shouldn't throw any such error...
Anyone knows why? No documentation about any of that....
In case someone stumbles upon this issue... After some further looking into I believe the Apache Flume project is dead in the water and we're going to stop using it. For details please see the following:
Many open issues… no responses… https://cwiki.apache.org/confluence/display/FLUME/Developer+Section
Last version released almost 2 years ago (in the beginning it used to release every 6 months). https://flume.apache.org/index.html
Wiki not even updated with last versions - last update more then 5 years ago - https://cwiki.apache.org/confluence/display/FLUME/Home
Sadly I'm going to have to close this question in this way. Hope that helps anyone.