I have the following class. I have verified in the console, the constructor of this class is called(during bean creation) before resolving the topic placeholder value in Kafka listener:
public class MsgReceiver<MSG> extends AbstractMsgReceiver<MSG> implements
MessageReceiver<MSG> {
@SuppressWarnings("unused")
private String topic;
public MsgReceiver(String topic, MessageHandler<MSG> handler) {
super(handler);
this.topic = topic;
}
@KafkaListener(topics = "${my.messenger.kafka.topics.#{${topic}}.value}", groupId = "${my.messenger.kafka.topics.#{${topic}}.groupId}")
public void receiveMessage(@Headers Map<String, Object> headers, @Payload MSG payload) {
System.out.println("Received "+payload);
super.receiveMessage(headers, payload);
}
}
I have my application.yml as follows:
my:
messenger:
kafka:
address: localhost:9092
topics:
topic_1:
value: my_topic
groupId: 1
During bean creation, I pass "topic_1" which I want should dynamically be used inside Kafka listener topic placeholder. I tried as shown in the code itself, but it does not work. Please suggest how to do that.
Placeholders are resolved before SpEL is evaluated; you can't dynamically build a placeholder name using SpEL. Also, you can't reference fields like that; you have to do it indirectly via the bean name (and a public getter).
So, to do what you want, you have to add a getter and get the property dynamically from the environment after building the property name with SpEL.
There is a special token __listener
which allows you to reference the current bean.
Putting it all together...
@SpringBootApplication
public class So63056065Application {
public static void main(String[] args) {
SpringApplication.run(So63056065Application.class, args);
}
@Bean
public MyReceiver receiver() {
return new MyReceiver("topic_1");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("my_topic").partitions(1).replicas(1).build();
}
}
class MyReceiver {
private final String topic;
public MyReceiver(String topic) {
this.topic = topic;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(topics = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.value')}",
groupId = "#{environment.getProperty('my.messenger.kafka.topics.' + __listener.topic + '.groupId')}")
public void listen(String in) {
System.out.println(in);
}
}
Result...
2020-07-23 12:13:44.932 INFO 39561 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = 1
group.instance.id = null
...
and
1: partitions assigned: [my_topic-0]