I try to send message to exact Kafka topic and then receive message from it. I have 3 configuration classes for Consumer, Producer and Topic:
public class KafkaTopicConfiguration {
@Autowired
GlobalConfiguration globalConfiguration;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
return new KafkaAdmin(configs);
}
}
public class KafkaConsumerConfiguration {
@Autowired
GlobalConfiguration globalConfiguration;
@Bean
public Consumer<String, String> consumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
globalConfiguration.kafka().bootstrapAddress());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
globalConfiguration.kafka().groupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
}
public class KafkaProducerConfiguration {
@Autowired
GlobalConfiguration globalConfiguration;
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConfiguration.kafka().bootstrapAddress());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(configProps);
}
}
GlobalConfiguration class stores all my properties. For Kafka:
bootstrapAddress = "localhost:9092"
groupId = "KafkaExampleConsumer"
Then I send message in this way
private void sendMessage(final String topic, final String message) {
kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
Future<RecordMetadata> sendResponse = kafkaProducer.send(producerRecord);
kafkaProducer.flush();
boolean isSent = sendResponse.isDone();
}
I check if message is sent with sendResponse.isDone() and it returns true. But then I try to receive message:
protected String receiveMessage(final String topic, final String message) {
kafkaAdmin.createOrModifyTopics(new NewTopic(topic, 1, (short) 1));
consumer.subscribe(Collections.singleton(topic));
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofMillis(10000));
consumerRecords.isEmpty();
}
And ConsumerRecords always empty. What problem it can be?
If you want to consume previously sent records, rather than poll records sent within 10 seconds of you starting the consumer, you need to add
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name().toLowerCase());
Note - this only works on a brand new consumer group id / no existing committed offsets