javaspringspring-bootspring-cloud-streamspring-rabbit

Spring Cloud Stream 4.0.4 Producer error Channel Enabled property not working


I am trying to understand how exactly the errorChannelEnabled property works for Spring Cloud Stream. I am using version 4.0.4 of Spring Cloud Stream and rabbit binder.

Based on this answer, I understand that there is a global error channel named errorChannel and a per binding error channel for producers which is created when this errorChannelEnabled property is set to true. The name of this channel is <destination>.<group>.errors.

However, this does not seem to work for this simple application:

package com.example.rabbittests;

import java.util.function.Supplier;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.ErrorMessage;

@SpringBootApplication
public class RabbitTestsApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitTestsApplication.class, args);
    }

    @Bean
    public Supplier<String> supplier_err() {
        return () -> {
            System.out.println("Sending msg...");
            throw new RuntimeException("An error occurred...");
        };
    }

    @ServiceActivator(inputChannel = "atest1.g1.errors")
    public void processError(ErrorMessage msg) {
        System.out.println("My test error: " + msg);
    }
}

Following is my application.properties file:

spring.cloud.function.definition=supplier_err
spring.cloud.stream.bindings.supplier_err-out-0.destination=atest1
spring.cloud.stream.bindings.supplier_err-out-0.group=g1
spring.cloud.stream.bindings.supplier_err-out-0.producer.error-channel-enabled=true

It looks like it is creating unknown.channel.name channel for error channel as in the logs, the following line is printed:

2023-08-28T14:17:11.690+05:30  INFO 20708 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'unknown.channel.name' has 1 subscriber(s).

The relevant logs are:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.1.2)

2023-08-28T14:26:44.961+05:30  INFO 3016 --- [           main] c.e.rabbittests.RabbitTestsApplication   : Starting RabbitTestsApplication using Java 17.0.7 with PID 3016 (C:\Users\Aadish Jain\Desktop\rabbit-tests\target\classes started by Aadish Jain in C:\Users\Aadish Jain\Desktop\rabbit-tests)
2023-08-28T14:26:44.966+05:30  INFO 3016 --- [           main] c.e.rabbittests.RabbitTestsApplication   : No active profile set, falling back to 1 default profile: "default"
2023-08-28T14:26:45.907+05:30  INFO 3016 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-08-28T14:26:45.916+05:30  INFO 3016 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-08-28T14:26:46.098+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.109+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration' of type [org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.112+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'bindersHealthContributor' of type [org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration$BindersHealthContributor] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.113+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'bindersHealthIndicatorListener' of type [org.springframework.cloud.stream.config.BindersHealthIndicatorAutoConfiguration$BindersHealthIndicatorListener] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.116+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration' of type [org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.117+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'rabbitExtendedPropertiesDefaultMappingsProvider' of type [org.springframework.cloud.stream.binder.rabbit.config.ExtendedBindingHandlerMappingsProviderConfiguration$$Lambda$434/0x000000080042a250] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.119+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'BindingHandlerAdvise' of type [org.springframework.cloud.stream.config.BindingHandlerAdvise] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.129+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'spelConverter' of type [org.springframework.cloud.stream.config.SpelExpressionConverterConfiguration$SpelConverter] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.151+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'spring.jmx-org.springframework.boot.autoconfigure.jmx.JmxProperties' of type [org.springframework.boot.autoconfigure.jmx.JmxProperties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.160+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:46.218+05:30  INFO 3016 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2023-08-28T14:26:47.202+05:30  INFO 3016 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 1 endpoint(s) beneath base path '/actuator'
2023-08-28T14:26:47.653+05:30  WARN 3016 --- [           main] ocalVariableTableParameterNameDiscoverer : Using deprecated '-debug' fallback for parameter name resolution. Compile the affected code with '-parameters' instead or avoid its introspection: org.springframework.amqp.rabbit.core.RabbitAdmin
2023-08-28T14:26:47.703+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
2023-08-28T14:26:47.711+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel supplier_err-out-0
2023-08-28T14:26:47.720+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
2023-08-28T14:26:47.724+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel supplier_err_integrationflow.channel#0
2023-08-28T14:26:47.730+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler supplier_err_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2023-08-28T14:26:47.739+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler _org.springframework.integration.errorLogger
2023-08-28T14:26:47.745+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageSource supplier_err-out-0_spca
2023-08-28T14:26:47.749+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering Endpoint org.springframework.integration:type=ManagedEndpoint,name="supplier_err-out-0_spca.adapter",bean=endpoint
2023-08-28T14:26:47.771+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel atest1.g1.errors
2023-08-28T14:26:47.779+05:30  INFO 3016 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler rabbitTestsApplication.processError.serviceActivator
2023-08-28T14:26:47.791+05:30  INFO 3016 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-08-28T14:26:47.793+05:30  INFO 3016 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-08-28T14:26:47.795+05:30  INFO 3016 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-08-28T14:26:47.796+05:30  INFO 3016 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {router} as a subscriber to the 'supplier_err_integrationflow.channel#0' channel
2023-08-28T14:26:47.797+05:30  INFO 3016 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.supplier_err_integrationflow.channel#0' has 1 subscriber(s).
2023-08-28T14:26:47.797+05:30  INFO 3016 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'supplier_err_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'    
2023-08-28T14:26:47.798+05:30  INFO 3016 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {service-activator:rabbitTestsApplication.processError.serviceActivator} as a subscriber to the 'atest1.g1.errors' channel
2023-08-28T14:26:47.798+05:30  INFO 3016 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.atest1.g1.errors' has 1 subscriber(s).
2023-08-28T14:26:47.798+05:30  INFO 3016 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean 'rabbitTestsApplication.processError.serviceActivator'
2023-08-28T14:26:47.801+05:30  INFO 3016 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: rabbit
2023-08-28T14:26:47.801+05:30  INFO 3016 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Constructing binder child context for rabbit
2023-08-28T14:26:47.890+05:30  INFO 3016 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: rabbit
2023-08-28T14:26:47.933+05:30  INFO 3016 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-08-28T14:26:48.008+05:30  INFO 3016 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#539c4830:0/SimpleConnection@ce2eaa7 [delegate=amqp://guest@127.0.0.1:5672/, localPort=56110]
2023-08-28T14:26:48.038+05:30  INFO 3016 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'unknown.channel.name' has 1 subscriber(s).
2023-08-28T14:26:48.048+05:30  WARN 3016 --- [           main] o.s.c.s.b.r.RabbitMessageChannelBinder   : Producer error channel is enabled, but the connection factory is not configured for returns or confirms; the error channel will receive no messages
2023-08-28T14:26:48.051+05:30  WARN 3016 --- [           main] o.s.i.a.outbound.AmqpOutboundEndpoint    : A confirm correlation expression is provided but the underlying connection factory does not support correlated delivery confirmations; no confirmations will be received
2023-08-28T14:26:48.051+05:30  WARN 3016 --- [           main] o.s.i.a.outbound.AmqpOutboundEndpoint    : A return channel is provided but the underlying connection factory does not support returned messages; none will be received
2023-08-28T14:26:48.052+05:30  INFO 3016 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.supplier_err-out-0' has 1 subscriber(s).
2023-08-28T14:26:48.058+05:30  INFO 3016 --- [           main] o.s.i.e.SourcePollingChannelAdapter      : started bean 'supplier_err-out-0_spca'
Sending msg...
2023-08-28T14:26:48.068+05:30 ERROR 3016 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:426)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)    
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)  
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: An error occurred...
        at com.example.rabbittests.RabbitTestsApplication.lambda$0(RabbitTestsApplication.java:22)
        at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:728)
        at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577)
        at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:50)
        at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:574)
        at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.get(SimpleFunctionRegistry.java:585)
        at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.get(PartitionAwareFunctionWrapper.java:105)
        at org.springframework.integration.dsl.IntegrationFlow$1.doReceive(IntegrationFlow.java:258)
        at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:443)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
        ... 13 more

Can someone explain why this code does not catch producer errors? Is there some other way to use errorChannelEnabled property?

Thanks


Solution

  • I found the issue Error channel name collision #2507. It seems the pattern <destination>.<group>.errors doesn't work anymore for the InputChannel according to this issue.

    I added a Consumer for error messages, as described in the section Spring Cloud Stream - Handle Error Messages, but this doesn't work either. Doesn't matter if I add the error-handler-definition for a specific binder or as default error handle. The consumer never receives any error. I think the spring.io documentation is bad, because the error-handler-definition works only for consumer errors and not for producer errors. To find that out, I had to read the SpringCloudAzure5x integration of spring from Microsoft. There they distinguish between outbound and inbound messages.

    The only way I can get it work is with the InputChannel errorChannel, but a global error channel is quite messy for us to handle.