I am doing a poc with kafka spring integration Java DSL I am reading a row from data base(DB) and send that row as msg to Kafka Topic. please find the code below. Code is compiling and i can able to fetch the record from DB, but i didn't see any msg in topic.
@Configuration
public class KafkaProduceConfig {
@Bean
public IntegrationFlow pollingAdapterFlow(EntityManagerFactory entityManagerFactory, MyTransformer transformer) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory).entityClass(MyRecord.class),
e -> e.poller(p -> p.cron("*/1 * * * * *").maxMessagesPerPoll(1).transactional())
.autoStartup(true))
.log(message -> "Polled DB Records from KafkaProduceConfig : " + message.getPayload())
.split()
.log(message -> "Record after split : " + message.getPayload())
.enrichHeaders(hrdSpec ->hrdSpec.headerExpression("myRecord", "payload",true))
.transform(transformer,"getCustomeRecord")
.enrichHeaders(hrdSpec ->hrdSpec.headerExpression("customeRecord","payload",true))
.log(message -> "Transformed Record : " + message.getPayload() +",topic :" +message.getHeaders().get("topic"))
.channel("sendToKafka")
.get();
}
@Bean
public IntegrationFlow outboundChannelAdapterFlow() {
return IntegrationFlow.from("sendToKafka")
.log(message -> "outboundChannelAdapterFlow received payload : " + message.getPayload() +",topic :"
+message.getHeaders().get("topic")+"key :"+message.getHeaders().get("key"))
.handle(m->Kafka.outboundChannelAdapter(producerFactory()).topic(m.getHeaders().get("topic").toString())
.messageKey(m.getHeaders().get("key").toString())
// .headerMapper(mapper())
.partitionId((Integer) m.getHeaders().get("partitionId")))
.get();
}
public ProducerFactory<Integer, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
Msg should publish to Kafka topic.
The configuration .handle(m->Kafka.outboundChannelAdapter(producerFactory())
is not correct. That lambda makes a new MessageHandler
which body is just to use that Kafka
factory whenever a new message arrives. This code just does not handle this message.
You must look into a handle()
variant where you provide a MessageHandler
from the factory, not a new by lambda.
So, something like this:
.handle(Kafka.outboundChannelAdapter(producerFactory())
.topic(m -> m.getHeaders().get("topic").toString())
.messageKey(m -> m.getHeaders().get("key").toString())
// .headerMapper(mapper())
.partitionId(m -> (Integer) m.getHeaders().get("partitionId")))
This way the MessageHandler
is going to be created during configuration phase. And at runtime its handleMessage()
method is going to be called against request message. All those option are now lambdas to be called at runtime.
P.S. Please, edit your question for more readable code snippets.