I am trying to understand how exactly the errorChannelEnabled property works for Spring Cloud Stream. I am using version 4.0.4 of Spring Cloud Stream and rabbit binder.
Based on this answer, I understand that there is a global error channel named errorChannel and a per binding error channel for producers which is created when this errorChannelEnabled property is set to true. The name of this channel is <destination>.<group>.errors.
However, this does not seem to work for this simple application:
package com.example.rabbittests;
import java.util.function.Supplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.ErrorMessage;
@SpringBootApplication
public class RabbitTestsApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitTestsApplication.class, args);
}
@Bean
public Supplier<String> supplier_err() {
return () -> {
System.out.println("Sending msg...");
throw new RuntimeException("An error occurred...");
};
}
@ServiceActivator(inputChannel = "atest1.g1.errors")
public void processError(ErrorMessage msg) {
System.out.println("My test error: " + msg);
}
}
Following is my application.properties file:
spring.cloud.function.definition=supplier_err
spring.cloud.stream.bindings.supplier_err-out-0.destination=atest1
spring.cloud.stream.bindings.supplier_err-out-0.group=g1
spring.cloud.stream.bindings.supplier_err-out-0.producer.error-channel-enabled=true
It looks like it is creating unknown.channel.name channel for error channel as in the logs, the following line is printed:
2023-08-28T14:17:11.690+05:30 INFO 20708 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
The relevant logs are:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.1.2)
2023-08-28T14:26:44.961+05:30 INFO 3016 --- [ main] c.e.rabbittests.RabbitTestsApplication : Starting RabbitTestsApplication using Java 17.0.7 with PID 3016 (C:\Users\Aadish Jain\Desktop\rabbit-tests\target\classes started by Aadish Jain in C:\Users\Aadish Jain\Desktop\rabbit-tests)
2023-08-28T14:26:44.966+05:30 INFO 3016 --- [ main] c.e.rabbittests.RabbitTestsApplication : No active profile set, falling back to 1 default profile: "default"
2023-08-28T14:26:45.907+05:30 INFO 3016 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-08-28T14:26:45.916+05:30 INFO 3016 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-08-28T14:26:46.098+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.109+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration' of type [org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.112+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'bindersHealthContributor' of type [org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration$BindersHealthContributor] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.113+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'bindersHealthIndicatorListener' of type [org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration$BindersHealthIndicatorListener] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.116+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration' of type [org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.117+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'rabbitExtendedPropertiesDefaultMappingsProvider' of type [org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration$$Lambda$434/0x000000080042a250] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.119+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'BindingHandlerAdvise' of type [org.springframework.cloud.stream.config.BindingHandlerAdvise] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.129+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'spelConverter' of type [org.springframework.cloud.stream.config.SpelExpressionConverterConfiguration$SpelConverter] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.151+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'spring.jmx-org.springframework.boot.autoconfigure.jmx.JmxProperties' of type [org.springframework.boot.autoconfigure.jmx.JmxProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.160+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.218+05:30 INFO 3016 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:47.202+05:30 INFO 3016 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 1 endpoint(s) beneath base path '/actuator'
2023-08-28T14:26:47.653+05:30 WARN 3016 --- [ main] ocalVariableTableParameterNameDiscoverer : Using deprecated '-debug' fallback for parameter name resolution. Compile the affected code with '-parameters' instead or avoid its introspection: org.springframework.amqp.rabbit.core.RabbitAdmin
2023-08-28T14:26:47.703+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2023-08-28T14:26:47.711+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel supplier_err-out-0
2023-08-28T14:26:47.720+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel
2023-08-28T14:26:47.724+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel supplier_err_integrationflow.channel#0
2023-08-28T14:26:47.730+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler supplier_err_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2023-08-28T14:26:47.739+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler _org.springframework.integration.errorLogger
2023-08-28T14:26:47.745+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageSource supplier_err-out-0_spca
2023-08-28T14:26:47.749+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering Endpoint org.springframework.integration:type=ManagedEndpoint,name="supplier_err-out-0_spca.adapter",bean=endpoint
2023-08-28T14:26:47.771+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel atest1.g1.errors
2023-08-28T14:26:47.779+05:30 INFO 3016 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler rabbitTestsApplication.processError.serviceActivator
2023-08-28T14:26:47.791+05:30 INFO 3016 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-08-28T14:26:47.793+05:30 INFO 3016 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-08-28T14:26:47.795+05:30 INFO 3016 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-08-28T14:26:47.796+05:30 INFO 3016 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {router} as a subscriber to the 'supplier_err_integrationflow.channel#0' channel
2023-08-28T14:26:47.797+05:30 INFO 3016 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.supplier_err_integrationflow.channel#0' has 1 subscriber(s).
2023-08-28T14:26:47.797+05:30 INFO 3016 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'supplier_err_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2023-08-28T14:26:47.798+05:30 INFO 3016 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {service-activator:rabbitTestsApplication.processError.serviceActivator} as a subscriber to the 'atest1.g1.errors' channel
2023-08-28T14:26:47.798+05:30 INFO 3016 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.atest1.g1.errors' has 1 subscriber(s).
2023-08-28T14:26:47.798+05:30 INFO 3016 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'rabbitTestsApplication.processError.serviceActivator'
2023-08-28T14:26:47.801+05:30 INFO 3016 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: rabbit
2023-08-28T14:26:47.801+05:30 INFO 3016 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Constructing binder child context for rabbit
2023-08-28T14:26:47.890+05:30 INFO 3016 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: rabbit
2023-08-28T14:26:47.933+05:30 INFO 3016 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-08-28T14:26:48.008+05:30 INFO 3016 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#539c4830:0/SimpleConnection@ce2eaa7 [delegate=amqp://guest@127.0.0.1:5672/, localPort=56110]
2023-08-28T14:26:48.038+05:30 INFO 3016 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-08-28T14:26:48.048+05:30 WARN 3016 --- [ main] o.s.c.s.b.r.RabbitMessageChannelBinder : Producer error channel is enabled, but the connection factory is not configured for returns or confirms; the error channel will receive no messages
2023-08-28T14:26:48.051+05:30 WARN 3016 --- [ main] o.s.i.a.outbound.AmqpOutboundEndpoint : A confirm correlation expression is provided but the underlying connection factory does not support correlated delivery confirmations; no confirmations will be received
2023-08-28T14:26:48.051+05:30 WARN 3016 --- [ main] o.s.i.a.outbound.AmqpOutboundEndpoint : A return channel is provided but the underlying connection factory does not support returned messages; none will be received
2023-08-28T14:26:48.052+05:30 INFO 3016 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.supplier_err-out-0' has 1 subscriber(s).
2023-08-28T14:26:48.058+05:30 INFO 3016 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started bean 'supplier_err-out-0_spca'
Sending msg...
2023-08-28T14:26:48.068+05:30 ERROR 3016 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:426)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: An error occurred...
at com.example.rabbittests.RabbitTestsApplication.lambda$0(RabbitTestsApplication.java:22)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:728)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577)
at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:50)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:574)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:585)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:105)
at org.springframework.integration.dsl.IntegrationFlow$1.doReceive(IntegrationFlow.java:258)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:443)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
... 13 more
Can someone explain why this code does not catch producer errors? Is there some other way to use errorChannelEnabled property?
Thanks
I found the issue Error channel name collision #2507. It seems the pattern <destination>.<group>.errors doesn't work anymore for the InputChannel according to this issue.
I added a Consumer for error messages, as described in the section Spring Cloud Stream - Handle Error Messages, but this doesn't work either. Doesn't matter if I add the error-handler-definition
for a specific binder or as default error handle. The consumer never receives any error.
I think the spring.io documentation is bad, because the error-handler-definition
works only for consumer errors and not for producer errors. To find that out, I had to read the SpringCloudAzure5x integration of spring from Microsoft. There they distinguish between outbound and inbound messages.
The only way I can get it work is with the InputChannel errorChannel
, but a global error channel is quite messy for us to handle.