I have the following code to connect to Kafka
Properties props = new Properties();
props.put("bootstrap.servers", "myconfluentkafkabroker:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_CG");
props.put("group.instance.id", "my_instance_CG_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("key.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
props.put("value.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %d, offset = %d,"
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsKey(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
consumer.close();
}
Code didn't throw any errors but I still don't see the consumer listed
would this be an issue?
props.put("group.instance.id", "my_instance_CG_id");
You should verify the information that you see with the built-in tools that Kafka provides like kafka-consumer-groups.sh
You'll also need to actually poll messages and commit offsets, not just subscribe before you will see anything.
Otherwise, for that specific Control Center dashboard, it may require you to add the Monitoring Interceptors into your client