spring-cloud-streamspring-cloud-stream-binder-kafkaspring-cloud-functionspring-cloud-stream-binder

Same payload across two different streams not mapping correctly via Spring Cloud Stream


I am attempting to map two separate Kafka streams that have the same message payload type to two different Consumer beans that both use the same service implementation class using Spring Cloud Stream.

My code/configuration is similar to the following:

@Bean
public Consumer<MyPayload> myPayloadConsumerFromTopic1(MyPayloadProcessor myPayloadProcessor) {
  return myPayloadProcessor::processMyPayload;
}

@Bean
public Consumer<MyPayload> myPayloadConsumerFromTopic2(MyPayloadProcessor myPayloadProcessor) {
  return myPayloadProcessor::processMyPayload;
}

spring:
  cloud:
    function:
      definition: myPayloadConsumerFromTopic1,myPayloadConsumerFromTopic2
    stream:
      bindings:
        myPayloadConsumerFromTopic1-in-0: 
          destination: topic-1
          group: myPayloadConsumerFromTopic1Group
          ...
        myPayloadConsumerFromTopic2-in-0: 
          destination: topic-2
          group: myPayloadConsumerFromTopic2Group
          ...

However, when my microservice starts, I receive the following error:

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'my-payload-processor-1.myPayloadConsumerFromTopic1myPayloadConsumerFromTopic2-in-0'

What is the cause of this error? Can I not have two separate Kafka topics that contain the same payload type mapped into a Spring Cloud Stream microservice with the code/configuration listed above? If not, what additional code and/or configuration do I need to ensure that I can do this?


Solution

  • For anyone who may run into this same issue and is looking for how to solve it, here's the fix.

    spring:
      cloud:
        function:
          definition: myPayloadConsumerFromTopic1;myPayloadConsumerFromTopic2
    

    Note that previously I was using commas to separate the function definitions, whereas now I am using semicolons. That fixed this issue.