In my project, I need to connect to two different Kafka brokers and consume all events of two topics, one topic on each Kafka broker.
My application.yaml
looks somewhat like this:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two # I changed this binder between kafka-one and kafka-two manually for tests, the orderProcessedListener-in-1 binding doesn't have the exclusive producer
orderProcessedListener-in-0: # CONSUME FROM KAFKA ONE
destination: order-processed
group: spot
binder: kafka-one
orderProcessedListener-in-1: # CONSUMER FROM KAFKA TWO
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
configuration:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-1:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
But it didn't work when I ran the application, the consumer binding from Kafka One works normally orderProcessedListener-in-0
, but the other consumer binding from Kafka Two doesn't work orderProcessedListener-in-1
.
I am using:
I have the two Kafka clusters running fine on my development environment with docker containers, one exposed on the 9092 port and the other exposed on the 9093 port.
Kafka one with all signed consumers on all topics:
Kafka two without never signed consumers on all topics:
How do we adjust this?
The simple approach to solve this issue is to use two beans pointing to a unique entry point method.
Example:
@Component
public class OrderProcessedListener {
public void consume(final Message<Order> message) {
// your business logic to process message
}
@Bean
public Consumer<Message<Order>> orderProcessedFromKafkaOneListener() {
return this::consume;
}
@Bean
public Consumer<Message<Order>> orderProcessedFromKafkaTwoListener() {
return this::consume;
}
}
The properties configuration should be:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedFromKafkaOneListener;orderProcessedFromKafkaTwoListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two
orderProcessedFromKafkaOneListener-in-0:
destination: order-processed
group: spot
binder: kafka-one
orderProcessedFromKafkaTwoListener-in-0:
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
configuration:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedFromKafkaOneListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedFromKafkaTwoListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration:
sasl:
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
Anytime the best solution for this is to use the reactive functions.