I am building a Java Spring application using Storm 1.1.2 and Kafka 0.11 to be launched in a Docker container.
Everything in my topology works as planned but under a high load from Kafka, the Kafka lag increases more and more over time.
My KafkaSpoutConfig:
KafkaSpoutConfig<String,String> spoutConf =
KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
.build()
Then my topology is as follows
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);
builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");
Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/opt/storm.jar");
StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());
The RouterBolt (which extends BaseRichBolt) does one very simple switch statement and then uses a local KafkaProducer object to send a new message to another topic. Like I said, everything compiles and the topology runs as expected but under a high load (3000 messages/s), the Kafka lag just piles up equating to low throughput for the topology.
I've tried disabling acking with
conf.setNumAckers(0);
and
conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);
but I guess it's not an acking issue.
I've seen on the Storm UI that the RouterBolt has execution latency of 1.2ms and process latency of .03ms under the high load which leads me to believe the Spout is the bottleneck.Also the parallelism hint is 25 because there are 25 partitions of "myTopic". Thanks!
You may be affected by https://issues.apache.org/jira/browse/STORM-3102, which causes the spout to do a pretty expensive call on every emit. Please try upgrading to one of the fixed versions.
Edit: The fix isn't actually released yet. You might still want to try out the fix by building the spout from source using e.g. https://github.com/apache/storm/tree/1.1.x-branch to build a 1.1.4 snapshot.