spring-bootkotlinspring-securityspring-webfluxspring-cloud-gateway

weblux with retry and circuitBreaker return premature response or error response


api-gateway auth request interceptor. That will retry or return default response based on response and response status also modifies response and sends back to client

@Configuration
class GatewayConfig(
    private val errorFilter: GlobalErrorFilter,
){

@Bean
    fun authRequestFilter(
        builder: RouteLocatorBuilder,
    ) = builder.routes()
        .route(authService.name) { r ->
            r.path(authService.path)
                .filters { f ->
                    f.retry { retryConfig ->
                        retryConfig.apply {
                            setStatuses(
                                HttpStatus.PRECONDITION_FAILED, // validation token verification failure
                                HttpStatus.BAD_REQUEST
                            )
                            retries = RETRY_ATTEMPTS.toInt()
                            setBackoff(
                                INITIAL_RETRY_DELAY,
                                MAX_DELAY_BETWEEN_RETRY,
                                RETRY_FACTOR,
                                false
                            )
                            setMethods( // for now retry only on these methods
                                HttpMethod.GET,
                                HttpMethod.POST
                            )
                            setExceptions(
                                ConnectException::class.java,
                            )
                        }
                    }.circuitBreaker { config ->
                        config.setName("fallback") // this name is taken from the application.properties file. MUST NOT BE CHANGED
                            .setStatusCodes( // set of status code that will cause circuit breaker to trigger
                                setOf(
                                    HttpStatus.INTERNAL_SERVER_ERROR.value().toString()
                                )
                            )
                            .setFallbackUri("forward:${Endpoint.FALLBACK}") // fallback route for circuit breaker
                    }.filter(
                        errorFilter,
                        FILTER_ORDER // -2
                    ).modifyResponseBody( // modify response from user-service
                        ResponseWrapper::class.java,
                        Any::class.java
                    ) { exchange, response ->
                        if (response.status == ResponseStatus.SUCCESS && response.payload != null)
                            Mono.just(response.payload)
                        else Mono.just(
                            ResponseWrapper(
                                status = response.status,
                                payload = response.payload ?: response.status.value,
                            )
                        )
                    }
                }.uri(authService.uri)
        }
        .build()!!
}

A gloabal request gateway to handle all and any kind of error

@Component
@Primary
class GlobalErrorFilter(
    private val mapper: ObjectMapper,
) : GatewayFilter {
    override fun filter(
        exchange: ServerWebExchange,
        chain: GatewayFilterChain,
    ) = chain.filter(exchange)
        .onErrorResume { handleAuthenticationError(exchange, it) }
    
    fun handleAuthenticationError(
        exchange: ServerWebExchange,
        error: Throwable,
    )= when (error) {
        is NonRetryableAuthenticationException -> error.status to ResponseWrapper(
            status = error.responseStatus,
            payload = error.message ?: error.responseStatus.value
        )

        is RetryableAuthenticationException -> error.status to ResponseWrapper(
            status = error.responseStatus,
            payload = error.message ?: error.responseStatus.value
        )

        else -> {
            logger.error("Unexpected error: ${error.message}")

            HttpStatus.INTERNAL_SERVER_ERROR to ResponseWrapper(
                status = ResponseStatus.INTERNAL_SERVER_ERROR,
                payload = ResponseStatus.INTERNAL_SERVER_ERROR.value
            )
        }
    }.let { (statusCode, responseWrapper) ->
        writeErrorResponse(exchange, statusCode, responseWrapper)
    }

    private fun writeErrorResponse(
        exchange: ServerWebExchange,
        statusCode: HttpStatusCode,
        responseWrapper: ResponseWrapper<*>,
    ): Mono<Void> {
        exchange.response.statusCode = statusCode
        exchange.response.headers.contentType = MediaType.APPLICATION_JSON

        return exchange.response.writeWith(
            Mono.fromCallable { mapper.writeValueAsBytes(responseWrapper) }
                .map { exchange.response.bufferFactory().wrap(it) }
        )
    }

    companion object {
        private val logger = LoggerFactory.getLogger(JWTAuthenticationFilter::class.java)
    }
}
resilience4j.circuitbreaker.instances.fallback.registerHealthIndicator=true
resilience4j.circuitbreaker.instances.fallback.sliding-window-size=10
resilience4j.circuitbreaker.instances.fallback.minimum-number-of-calls=5
resilience4j.circuitbreaker.instances.fallback.permitted-number-of-calls-in-half-open-state=3
resilience4j.circuitbreaker.instances.fallback.wait-duration-in-open-state.seconds=30
resilience4j.circuitbreaker.instances.fallback.failure-rate-threshold=50
resilience4j.circuitbreaker.instances.fallback.slow-call-duration-threshold.seconds=3
resilience4j.circuitbreaker.instances.fallback.slow-call-rate-threshold=50
resilience4j.circuitbreaker.instances.fallback.automatic-transition-from-open-to-half-open-enabled=true
resilience4j.circuitbreaker.instances.fallback.sliding-window-type=count_based

Returning response to the client prematurely

request is intercepted by circuitBreaker before the reponse is comming from authserivce

Removing retry and circuitBreaker block fixes the issue. But I need to those. :)

Don't know What am i doing wrong.

Appriciate any and all help. Thank You. :)


Solution

  • Your fundamental problem's using a GatewayFilter with onErrorResume for global error handling, you intercept any Throwable in the reactive chain and terminate the exchange by writing a response immediately. The proper approach should be to use a dedicated implementation of ErrorWebExceptionHandler, which acts as a last-resort handler for the entire application. Let the Retry and CircuitBreaker filters handle the errors they're configured for and use a global exception handler for everything else.

    1. Remove the custom GlobalErrorFilter from your route definition in GatewayConfig. It's catching exceptions prematurely, before Retry and CircuitBreaker filters can inspect the response status code. This prevents them from ever triggering.

    //   .filter(
    //         errorFilter,
    //         FILTER_ORDER
    //   )
        .modifyResponseBody(
    

    2. Create a proper global error handler. It'll catch any unhandled exceptions from your gateway application, including those that Retry and CircuitBreaker filters don't handle.

    @Component
    @Order(-1)
    class GlobalErrorWebExceptionHandler(
        private val mapper: ObjectMapper
    ) : ErrorWebExceptionHandler {
    
        override fun handle(exchange: ServerWebExchange, ex: Throwable): Mono<Void> {
            val (statusCode, responseWrapper) = when (ex) {
                is io.github.resilience4j.circuitbreaker.CallNotPermittedException ->
                    HttpStatus.SERVICE_UNAVAILABLE to ResponseWrapper(
                        status = ResponseStatus.SERVICE_UNAVAILABLE,
                        payload = "Service is temporarily unavailable."
                    )
    
                else ->
                    HttpStatus.INTERNAL_SERVER_ERROR to ResponseWrapper(
                        status = ResponseStatus.INTERNAL_SERVER_ERROR,
                        payload = "An unexpected internal error occurred."
                    )
            }
    
    
        }
    }
    

    3. Update your gateway configuration with your new error handler. And also adjust your Retry configuration to follow best practices. Retrying on 4xx status codes is generally an anti-pattern, as these's client errors and are unlikely to succeed on retry without changes to the request. You should primarily retry on transient server errors (5xx) or connection issues.

    .filters { f ->
        f.retry { retryConfig ->
            retryConfig.setStatuses(
                HttpStatus.INTERNAL_SERVER_ERROR,
                HttpStatus.SERVICE_UNAVAILABLE
            )
            retryConfig.setExceptions(
                java.io.IOException::class.java
            )
        }.circuitBreaker { config ->
            config.setName("fallback")
                  .setFallbackUri("forward:/fallback/auth")
        }