spring-bootapache-camelactivemq-classicmessage-queueeai

Circuit breaker for asynchronous microservices..?


There is a ActiveMQ queue (QueueA). A service (MyService) subscribes to messages, processes it and sends the message to another ActiveMQ queue (QueueB).

QueueA -> MyService -> QueueB

Cosider a scenario where thousands of messages are in QueueA. At the same time, QueueB is down. I want to stop processing if a certain number of messages (say 100) messages are consecutively failing while sending messages to QueueB. It should test for a rolling window in certain time period (say, 100 consecutive messages failed in 60 seconds) and stop consuming from QueueA. It should then test if the service is up after 15 minutes or so by sending one more message. If it still fails, again stop consuming from QueueA for another 15 minutes.

Right now, what is happening is that all the messages are erroring out and we have to reprocess every message again. There is a recovery mechanism, but the recovery mechanism is getting overloaded because of the limitations of the current archituecture.

Is there any pattern for this? Is it the same circuit breaker (I am aware of it in synchronous context). If so, not sure if there is a solution in Java / Spring Boot / Apache Camel. Yes, that is the technology stack we are currently on. Any guidelines for the pattern also will help even if you may not have the knowledge of this specific technology platform.

I have also read the following question in StackOverflow.

Is circuit breaker pattern applicable for asynchronous requests also?

Thanks and appreciate your time in helping me with this.


Solution

  • Have a look on the Camel RoutePolicy of type ThrottlingExceptionRoutePolicy which is based on the CircuitBreakerLoadBalancer.

    Using this policy should allow you to stop consuming from the endpoint when the circuit is in the open state (to compare with the standard circuit behahiour : bypass the service call, and fallback to another response).

    @Bean
    public ThrottlingExceptionRoutePolicy myCustomPolicy() {
        // Important: do not open circuit for this kind of exceptions
        List<Class<?>> handledExceptions = Arrays.asList(MyException.class);
        return new ThrottlingExceptionRoutePolicy(failureThreshold, failureWindow, halfOpenAfter, handledExceptions);
    } 
     
    from("jms:queue:QueueA")
        .routePolicy(myCustomPolicy)
        .to("mock:MyService")