javaspring-bootspring-cloudspring-cloud-stream

Spring Cloud Stream: "Failed to locate function" warning when using custom Consumer abstraction


I am encountering a warning when starting my application using Spring Boot 3.5.8 and Spring Cloud Stream 2025.0.0.

To provide some context, I created an abstraction over Consumer to handle boilerplate code, specifically for MDC setup and centralized error handling.

Here is my core interface:

public interface StreamConsumer<IN> extends Consumer<Message<IN>> {
    void process(MessageHeaders headers, IN input) throws Exception;
    default void setupLogging(MessageHeaders headers, IN input) {}
    default void onError(MessageHeaders headers, IN input, Exception e) {}
}

And the abstract class implementing it:

@Slf4j
public abstract class AbstractConsumer<IN> implements StreamConsumer<IN> {
    @Override
    public void accept(Message<IN> inMessage) {
        IN payload = inMessage.getPayload();
        MessageHeaders headers = inMessage.getHeaders();
        
        log.debug("Received message: {}", payload);
        log.debug("Received headers: {}", headers);
        
        try {
            setupLogging(headers, payload);
            process(headers, payload);
        } catch (Exception e) {
            log.error("Error processing message: {}", payload, e);
            onError(headers, payload, e);
            throw new RuntimeException(e);
        } finally {
            MDC.clear();
        }
    }
}

Finally, the specific implementation looks like this:

@Slf4j
@Component
public class LogTaskConsumer extends AbstractConsumer<Task> {

    @Override
    public void process(MessageHeaders headers, Task input) {
        // ... business logic
    }

    @Override
    public void setupLogging(MessageHeaders headers, Task input) {
        //MDC setup example...
        super.setupLogging(headers, input);
        Object id = headers.get("id");
        if (id != null) {
            MDC.put("id", id.toString());
        }
    }
}

My application.yaml configuration:

spring:
  cloud:
    function:
      definition: logTaskConsumer;...others
    stream:
      default-binder: rabbit
      binding-retry-interval: 3
      default:
        group: ${spring.application.name}
        content-type: application/json
        consumer:
          concurrency: 5
      bindings:
        logTaskConsumer-in-0:
          destination: xorch_task_log_2
          consumer:
            max-attempts: 5

The Issue Integration tests pass, the message reaches the consumer, and it is processed correctly. Everything works as expected functionally. However, I see the following warning in the logs at startup:

-2025-12-27 03:07:48.844 [] - WARN 26452 --- [ restartedMain] c.f.c.c.BeanFactoryAwareFunctionRegistry : Failed to locate function 'logTaskConsumer' for function definition 'logTaskConsumer'. Returning null.

I do not get this warning when I implement the consumer the "traditional" functional way (using a standard @Bean).

Based on my research, this seems related to Spring proxies preventing Spring Cloud Stream from correctly inferring the bean type or locating it in the registry, though it works at runtime due to fallback mechanisms.

My Question: How can I resolve this warning? I would really like to keep this abstraction as my team finds it comfortable and intuitive. However, if you believe this pattern is not recommended for Spring Cloud Stream, what would be the best alternative approach?


Solution

  • This is a known type inference issue in Spring Cloud Function. The BeanFactoryAwareFunctionRegistry uses FunctionTypeUtils.discoverFunctionType() which struggles to resolve generic types through deep inheritance hierarchies like LogTaskConsumer -> AbstractConsumer<Task> -> StreamConsumer<Task> -> Consumer<Message<Task>>

    Please try registering explicitly your customer with type information via FunctionRegistration.

    @Configuration
    public class StreamConsumerConfig {
    
        @Bean
        public FunctionRegistration<Consumer<Message<Task>>> logTaskConsumer(
                LogTaskConsumer consumer) {
            return new FunctionRegistration<>(consumer)
                .type(FunctionTypeUtils.consumerType(
                    ResolvableType.forClassWithGenerics(
                        Message.class, Task.class
                    ).getType()
                ));
        }
    }
    

    And remove @Component from LogTaskConsumer (keep it as a regular bean).

    @Slf4j
    public class LogTaskConsumer extends AbstractConsumer<Task> {
        // same code here
    }
    
    @Bean
    public LogTaskConsumer logTaskConsumerInstance() {
        return new LogTaskConsumer();
    }