quarkusquarkus-reactive

Quarkus Mutiny retry failes because on another thread on version 3.0.3 Final


This topic is a continuation of question asked here: Quarkus Mutiny retry failes because on another thread

I created a test project here: github.com/sfeliks/retry-test When application is executed and Scheduler starts, SchedulerService throws test RuntimeException. It is expected application tries 3 times and and finishes. The actual result is that even when exception would not be thrown on any of retries, processing still would fail. In current state, job finishes with two errors:

2023-05-17 12:42:33,697 ERROR [io.qua.mut.run.MutinyInfrastructure] (executor-thread-1) Mutiny had to drop the following exception: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] java.lang.RuntimeException: Test Runtime
    [Exception 1] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [116]: 'vert.x-eventloop-thread-3' current Thread [102]: 'executor-thread-1'
    at io.smallrye.mutiny.groups.UniOnItemOrFailure.lambda$call$1(UniOnItemOrFailure.java:75)
    at io.smallrye.context.impl.wrappers.SlowContextualBiFunction.apply(SlowContextualBiFunction.java:21)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:86)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
    at io.smallrye.mutiny.operators.uni.UniOnTermination$UniOnTerminationProcessor.onFailure(UniOnTermination.java:52)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.helpers.EmptyUniSubscription.propagateFailureEvent(EmptyUniSubscription.java:40)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:26)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    at org.hibernate.reactive.context.impl.VertxContext.execute(VertxContext.java:90)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.subscribe(UniRunSubscribeOn.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnTermination.subscribe(UniOnTermination.java:21)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:99)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onFailure(UniOnItemConsume.java:65)
    at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onFailure(UniOnItemConsume.java:65)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher$PublisherSubscriber.onError(UniCreateFromPublisher.java:78)
    at io.smallrye.mutiny.helpers.HalfSerializer.onError(HalfSerializer.java:59)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onFailure(StrictMultiSubscriber.java:90)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onError(MultiSubscriber.java:73)
    at io.smallrye.mutiny.subscription.SerializedSubscriber.onFailure(SerializedSubscriber.java:101)
    at io.smallrye.mutiny.operators.multi.MultiRetryWhenOp$RetryWhenOperator.whenFailure(MultiRetryWhenOp.java:165)
    at io.smallrye.mutiny.operators.multi.MultiRetryWhenOp$TriggerSubscriber.onError(MultiRetryWhenOp.java:193)
    at io.smallrye.mutiny.helpers.HalfSerializer.onError(HalfSerializer.java:59)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onFailure(StrictMultiSubscriber.java:90)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.postponeFailure(MultiConcatMapOp.java:238)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.innerFailure(MultiConcatMapOp.java:214)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapInner.onFailure(MultiConcatMapOp.java:297)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onError(MultiSubscriber.java:73)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onFailure(UniToMultiPublisher.java:102)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownFailure$KnownFailureSubscription.forward(UniCreateFromKnownFailure.java:38)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownFailure.subscribe(UniCreateFromKnownFailure.java:23)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:73)
    at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.setOrSwitchUpstream(SwitchableSubscriptionSubscriber.java:205)
    at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.onSubscribe(SwitchableSubscriptionSubscriber.java:107)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:25)
    at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:165)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.onItem(MultiConcatMapOp.java:144)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.operators.multi.processors.UnicastProcessor.drainWithDownstream(UnicastProcessor.java:107)
    at io.smallrye.mutiny.operators.multi.processors.UnicastProcessor.drain(UnicastProcessor.java:138)
    at io.smallrye.mutiny.operators.multi.processors.UnicastProcessor.request(UnicastProcessor.java:239)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.innerComplete(MultiConcatMapOp.java:198)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapInner.onCompletion(MultiConcatMapOp.java:309)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:94)
    at io.smallrye.mutiny.operators.uni.UniDelayOnItem$UniDelayOnItemProcessor.lambda$onItem$0(UniDelayOnItem.java:53)
    at org.jboss.threads.EnhancedQueueExecutor$RunnableScheduledFuture.performTask(EnhancedQueueExecutor.java:2892)
    at org.jboss.threads.EnhancedQueueExecutor$RunnableScheduledFuture.performTask(EnhancedQueueExecutor.java:2883)
    at org.jboss.threads.EnhancedQueueExecutor$AbstractScheduledFuture.run(EnhancedQueueExecutor.java:2741)
    at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
    at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
    at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
    Suppressed: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [116]: 'vert.x-eventloop-thread-3' current Thread [102]: 'executor-thread-1'
        at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:195)
        at org.hibernate.reactiv.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1777)
        at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:447)
        at org.hibernate.internal.SessionImpl.checkOpenOrWaitingForAutoClose(SessionImpl.java:616)
        at org.hibernate.internal.SessionImpl.closeWithoutOpenChecks(SessionImpl.java:410)
        at org.hibernate.internal.SessionImpl.close(SessionImpl.java:397)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactiveClose(ReactiveSessionImpl.java:1729)
        at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:24)
        ... 58 more
Caused by: java.lang.RuntimeException: Test Runtime
    at org.acme.SchedulerService.lambda$retrieveDataAndSaveInLocalDB$2(SchedulerService.java:28)
    at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
    at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.invokeEventHandler(UniOnItemConsume.java:77)
    at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onItem(UniOnItemConsume.java:42)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:43)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:29)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier.subscribe(UniCreateFromDeferredSupplier.java:36)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform.subscribe(UniOnItemTransform.java:22)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemConsume.subscribe(UniOnItemConsume.java:30)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:73)
    at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.setOrSwitchUpstream(SwitchableSubscriptionSubscriber.java:205)
    at io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber.onSubscribe(SwitchableSubscriptionSubscriber.java:107)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:25)
    at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:165)
    at io.smallrye.mutiny.operators.multi.MultiRetryWhenOp$RetryWhenOperator.resubscribe(MultiRetryWhenOp.java:157)
    at io.smallrye.mutiny.operators.multi.MultiRetryWhenOp$TriggerSubscriber.onNext(MultiRetryWhenOp.java:188)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:30)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:84)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.tryEmit(MultiConcatMapOp.java:182)
    at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapInner.onItem(MultiConcatMapOp.java:285)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:92)
    ... 11 more
    Suppressed: java.lang.IllegalStateException: Retries exhausted: 3/3
        at io.smallrye.mutiny.helpers.ExponentialBackoff.lambda$randomExponentialBackoffFunction$0(ExponentialBackoff.java:43)
        at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
        at io.smallrye.mutiny.groups.MultiOnItem.lambda$transformToUni$6(MultiOnItem.java:267)
        at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.onItem(MultiConcatMapOp.java:139)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.operators.multi.processors.UnicastProcessor.drainWithDownstream(UnicastProcessor.java:107)
        at io.smallrye.mutiny.operators.multi.processors.UnicastProcessor.drain(UnicastProcessor.java:138)
        at io.smallrye.mutiny.operators.multi.processors.UnicastProcessor.request(UnicastProcessor.java:239)
        at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapMainSubscriber.innerComplete(MultiConcatMapOp.java:198)
        at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$ConcatMapInner.onCompletion(MultiConcatMapOp.java:309)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
        at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:94)
        ... 11 more
    [CIRCULAR REFERENCE:java.lang.RuntimeException: Test Runtime]

As pointed out in the first question, downgrading to 2.16 fixes this issue.


Solution

  • I think this is a duplicate of this: Quarkus Mutiny retry failes because on another thread The problem is caused by .withBackoff.

    Hibernate Reactive checks two things when the session is used:

    1. The session must be called in a event-loop thread
    2. The session must be used in the same thread it has been created.

    In this case, the first check is failing because of a limitation (or bug) in Mutiny: when retrying using the .withBackoff option, the uni runs in a worker thread (and not in the event-loop thread the session was created). Note that even if you create a new session, the check will fail because the code it's not running in a worker-thread (and not a event-loop thread).

    This works with Quarkus 2.16 because there is a check that makes sure that everything runs in an event-loop thread. This check has been removed in Quarkus 3, this is probably a bug.

    At the moment, I don't have a workaround, but it should work if you avoid the .withBackoff option.