I am encountering a warning when starting my application using Spring Boot 3.5.8 and Spring Cloud Stream 2025.0.0.
To provide some context, I created an abstraction over Consumer to handle boilerplate code, specifically for MDC setup and centralized error handling.
Here is my core interface:
public interface StreamConsumer<IN> extends Consumer<Message<IN>> {
void process(MessageHeaders headers, IN input) throws Exception;
default void setupLogging(MessageHeaders headers, IN input) {}
default void onError(MessageHeaders headers, IN input, Exception e) {}
}
And the abstract class implementing it:
@Slf4j
public abstract class AbstractConsumer<IN> implements StreamConsumer<IN> {
@Override
public void accept(Message<IN> inMessage) {
IN payload = inMessage.getPayload();
MessageHeaders headers = inMessage.getHeaders();
log.debug("Received message: {}", payload);
log.debug("Received headers: {}", headers);
try {
setupLogging(headers, payload);
process(headers, payload);
} catch (Exception e) {
log.error("Error processing message: {}", payload, e);
onError(headers, payload, e);
throw new RuntimeException(e);
} finally {
MDC.clear();
}
}
}
Finally, the specific implementation looks like this:
@Slf4j
@Component
public class LogTaskConsumer extends AbstractConsumer<Task> {
@Override
public void process(MessageHeaders headers, Task input) {
// ... business logic
}
@Override
public void setupLogging(MessageHeaders headers, Task input) {
//MDC setup example...
super.setupLogging(headers, input);
Object id = headers.get("id");
if (id != null) {
MDC.put("id", id.toString());
}
}
}
My application.yaml configuration:
spring:
cloud:
function:
definition: logTaskConsumer;...others
stream:
default-binder: rabbit
binding-retry-interval: 3
default:
group: ${spring.application.name}
content-type: application/json
consumer:
concurrency: 5
bindings:
logTaskConsumer-in-0:
destination: xorch_task_log_2
consumer:
max-attempts: 5
The Issue Integration tests pass, the message reaches the consumer, and it is processed correctly. Everything works as expected functionally. However, I see the following warning in the logs at startup:
-2025-12-27 03:07:48.844 [] - WARN 26452 --- [ restartedMain] c.f.c.c.BeanFactoryAwareFunctionRegistry : Failed to locate function 'logTaskConsumer' for function definition 'logTaskConsumer'. Returning null.
I do not get this warning when I implement the consumer the "traditional" functional way (using a standard @Bean).
Based on my research, this seems related to Spring proxies preventing Spring Cloud Stream from correctly inferring the bean type or locating it in the registry, though it works at runtime due to fallback mechanisms.
My Question: How can I resolve this warning? I would really like to keep this abstraction as my team finds it comfortable and intuitive. However, if you believe this pattern is not recommended for Spring Cloud Stream, what would be the best alternative approach?
This is a known type inference issue in Spring Cloud Function. The BeanFactoryAwareFunctionRegistry uses FunctionTypeUtils.discoverFunctionType() which struggles to resolve generic types through deep inheritance hierarchies like LogTaskConsumer -> AbstractConsumer<Task> -> StreamConsumer<Task> -> Consumer<Message<Task>>
Please try registering explicitly your customer with type information via FunctionRegistration.
@Configuration
public class StreamConsumerConfig {
@Bean
public FunctionRegistration<Consumer<Message<Task>>> logTaskConsumer(
LogTaskConsumer consumer) {
return new FunctionRegistration<>(consumer)
.type(FunctionTypeUtils.consumerType(
ResolvableType.forClassWithGenerics(
Message.class, Task.class
).getType()
));
}
}
And remove @Component from LogTaskConsumer (keep it as a regular bean).
@Slf4j
public class LogTaskConsumer extends AbstractConsumer<Task> {
// same code here
}
@Bean
public LogTaskConsumer logTaskConsumerInstance() {
return new LogTaskConsumer();
}