kotlinquarkusproject-reactor

Rewrite data stream in async filter with Quarkus and Reactor


I have a webhook from side service and I try to make a filter in front of the controller, for check signature. Like a Spring Security style, but unfortunately, it's Quarkus with Reactor. In the controller, I have null and the debugger does even not achieve the controller method.

My targets are:

  1. Check signature I need a header and body for it and WebhookHandlerUtility is not mt class and I can't change it. And WebhookHandlerUtility.verifySignature(signature, body) sync method and I can't change it.
  2. I need to rewrite the stream because I expect to receive this data in the controller as a body.

What is the problem with my implementation and how to fix it?

@Provider
class SignatureFilter(
        private val objectMapper: ObjectMapper,
        private val webhookHandlerUtility: WebhookHandlerUtility
) : ContainerRequestFilter {

    private val executor = Executors.newFixedThreadPool(5)

    @Throws(IOException::class)
    override fun filter(requestContext: ContainerRequestContext) {
        executor.execute {
            val signature = requestContext.getHeaderString("signature")
            val body = requestContext.entityStream.reader().readText()

            if (!webhookHandlerUtility.verifySignature(signature, body)) {
                throw TerraSignatureVerificationException()
            }

            val payload = webhookHandlerUtility.parseWebhookPayload(body)
                    ?: throw TerraWebHookPayloadParseException()

            val json = objectMapper.writeValueAsString(payload.raw)
            val newInputStream = ByteArrayInputStream(json.toByteArray())

            requestContext.entityStream = newInputStream
        }
    }
}

@ApplicationScoped
@Path("/webHook")
class WebHookController(
        private val omronService: OmronService
) {

    @POST
    @Path("/device")//, 
    @Consumes(MediaType.APPLICATION_JSON)
    fun omronWebHook(req: HookDto): Uni<Response> {
        log.info("Received data: $req")
        return omronService.save(req)
                .map { Response.noContent().build() }
    }
}
2023-09-29 14:30:46,346 DEBUG [org.jbo.res.rea.ser.han.ParameterHandler] (vert.x-eventloop-thread-0) Error occurred during parameter extraction: 
javax.enterprise.inject.UnsatisfiedResolutionException: No bean found for required type [interface javax.ws.rs.container.ContainerRequestContext] and qualifiers [[]]
    at io.quarkus.arc.impl.InstanceImpl.bean(InstanceImpl.java:190)
    at io.quarkus.arc.impl.InstanceImpl.getInternal(InstanceImpl.java:211)
    at io.quarkus.arc.impl.InstanceImpl.get(InstanceImpl.java:97)
    at org.jboss.resteasy.reactive.server.core.parameters.ContextParamExtractor.extractParameter(ContextParamExtractor.java:100)
    at org.jboss.resteasy.reactive.server.handlers.ParameterHandler.handle(ParameterHandler.java:45)
    at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:111)
    at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:142)
    at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:51)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:18)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:8)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:84)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:71)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:430)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:408)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:196)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:185)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

2023-09-29 14:30:46,349 DEBUG [org.jbo.res.rea.com.cor.AbstractResteasyReactiveContext] (vert.x-eventloop-thread-0) Restarting handler chain for exception exception: 
javax.ws.rs.WebApplicationException: HTTP 400 Bad Request
    at org.jboss.resteasy.reactive.server.handlers.ParameterHandler.handle(ParameterHandler.java:66)
    at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:111)
    at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:142)
    at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:51)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:18)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:8)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:84)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:71)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:430)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:408)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:196)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:185)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: No bean found for required type [interface javax.ws.rs.container.ContainerRequestContext] and qualifiers [[]]
    at io.quarkus.arc.impl.InstanceImpl.bean(InstanceImpl.java:190)
    at io.quarkus.arc.impl.InstanceImpl.getInternal(InstanceImpl.java:211)
    at io.quarkus.arc.impl.InstanceImpl.get(InstanceImpl.java:97)
    at org.jboss.resteasy.reactive.server.core.parameters.ContextParamExtractor.extractParameter(ContextParamExtractor.java:100)
    at org.jboss.resteasy.reactive.server.handlers.ParameterHandler.handle(ParameterHandler.java:45)
    ... 30 more

2023-09-29 14:30:46,358 INFO  [http-problem] (vert.x-eventloop-thread-0) status=400, title="Bad Request", detail="HTTP 400 Bad Request" 
Exception in thread "pool-6-thread-1" java.lang.IllegalStateException: Cannot be called from response filter
    at org.jboss.resteasy.reactive.server.jaxrs.ContainerRequestContextImpl.assertNotResponse(ContainerRequestContextImpl.java:94)
    at org.jboss.resteasy.reactive.server.jaxrs.ContainerRequestContextImpl.setEntityStream(ContainerRequestContextImpl.java:158)
    at com.dfskjhzxk.terra.configuration.SignatureFilter.filter$lambda$0(SignatureFilter.kt:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)


Solution

  • The problem is that you are trying to access a request scoped bean from a thread that is not managed by Quarkus (as you are using your own executor).

    A simple remedy would be to just access whatever you need from ContainerRequestContext in the filter method itself, instead of having the former being accessed in the lambda that is used as the Runnable for the thread