I am working on a project that has 2 services: both reading, transforming the messages and then writing to another Kafka. The Kafka configuration for both these services are different. Here is my application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
sourcetopic1: topic1
destinationtopic1 : topic2
sourcetopic2: topic3
destinationtopic2 : topic4
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: TestCollector
client-id:TestCollector01
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
These are my config files for both the services:
Service1KafkaConfig
public class KafkaConfig {
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(@Value("${spring.kafka.sourcetopic1}") String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
}
Service2Config
public class Service2KafkaConfig {
@Bean
public ReceiverOptions<String, String> service2KafkaReceiverOptions(@Value("${spring.kafka.sourcetopic3}") String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
}
I autowire these bean in the respective services:
Service1: I haven't added the ProcessRecord method for service1 because I didn't feel that is needed for this issue. Please let me know if needed.
@Slf4j
@Service
public class Service1 implements CommandLineRunner {
@Autowired
public ReactiveKafkaConsumerTemplate<String, String> service1KafkaConsumerTemplate;
public Flux<String> consume1() {
return service1KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.donOnNext(s->ProcessRecord(s))
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric1[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume1().subscribe();
}
}
Service2:
@Slf4j
@Service
public class Service2 implements CommandLineRunner {
@Autowired
@Qualifier("service2KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> service2KafkaConsumerTemplate;
public Flux<String> consume2() {
return service2KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume2().subscribe();
}
}
When I run the application I can see only one of the consumer starting subscribed to topic1. Is it possible to have multiple Kafka consumers running in the same project. If yes, can you please let me know what needs to be done to have them running?
It looks like they are both using the same topic and configuration; if there is only one partition and they are in the same consumer group, only one of them will get any data.
If you want them both to get the same data, you must put them in different consumer groups.
This works as expected:
@Bean
public ApplicationRunner runner1() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "group1",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("INPUT_1"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(record -> {
System.out.println("one: " + record.value() + "@" + record.offset());
record.receiverOffset().acknowledge();
})
.doOnError(System.out::println)
.subscribe();
};
}
@Bean
public ApplicationRunner runner2() {
return args -> {
ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG, "group2",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.commitBatchSize(1)
.subscription(Collections.singletonList("INPUT_1"));
KafkaReceiver.create(ro)
.receive()
.doOnNext(record -> {
System.out.println("two: " + record.value() + "@" + record.offset());
record.receiverOffset().acknowledge();
})
.doOnError(System.out::println)
.subscribe();
};
}
one: foo@16
two: foo@16