javaapache-kafkaflumeflume-ng

Apache Flume Java client fails to start with a single Kafka sink


Using org.apache.flume.agent.embedded.EmbeddedAgent. Configuration as such:

Map<String, String> configurationProperties = ...;
service.configure(configurationProperties);

Where configurationProperties is set with:

    {
      "kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000",
      "processor.type": "load_balance",
      "sinks": "kafkaSink1",
      "channel.keep-alive": "0",
      "channel.checkpointDir": "********************************",
      "kafkaSink.kafka.producer.reconnect.backoff.ms": "2000",
      "channel.dataDirs": "********************************",
      "kafkaSink.kafka.producer.retry.backoff.ms": "1000",
      "processor.selector.maxTimeOut": "60000",
      "kafkaSink.kafka.producer.max.request.size": "5485760",
      "kafkaSink1.flumeBatchSize": "1000",
      "kafkaSink.kafka.producer.buffer.memory": "67108864",
      "kafkaSink.kafka.producer.client.id": "********************************",
      "kafkaSink1.useFlumeEventFormat": "true",
      "kafkaSink1.kafka.topic": "********************************",
      "kafkaSink.kafka.producer.batch.size": "8196",
      "channel.kafka.dataDirs": "********************************",
      "kafkaSink1.type": "KAFKA",
      "channel.backupCheckpointDir": "********************************",
      "kafkaSink1.allowTopicOverride": "true",
      "channel.useDualCheckpoints": "true",
      "kafkaSink.kafka.producer.compression.type": "lz4",
      "processor.maxBackoff": "60000",
      "use_dual_channel": "true",
      "channel.capacity": "1000000",
      "channel.byteCapacityBufferPercentage": "50",
      "channel.transactionCapacity": "1000",
      "channel.byteCapacity": "10485760",
      "channel.type": "file",
      "processor.backoff": "true",
      "channel.kafka.checkpointDir": "********************************",
      "channel.kafka.backupCheckpointDir": "********************************",
      "kafkaSink1.kafka.bootstrap.servers": "********************************",
      "kafkaSink.kafka.producer.acks": "-1"
    }

At runtime it throw the following:

java.lang.NullPointerException
        at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
        at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
        at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
        at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
        at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
        at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
        at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
        at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
        at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
        at ******************.startService(******************)

And flume will not start whatsoever.

The code for org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52 shows it is looking for property sinks which is not empty so it shouldn't throw any such error...

Anyone knows why? No documentation about any of that....


Solution

  • In case someone stumbles upon this issue... After some further looking into I believe the Apache Flume project is dead in the water and we're going to stop using it. For details please see the following:

    Sadly I'm going to have to close this question in this way. Hope that helps anyone.