dependency-injectionquarkussmallrye-reactive-messaging

Can't inject Emitter into class using Quarkus: jakarta.enterprise.inject.spi.DefinitionException: SRMSG00019: Unable to connect an emitter


I have a class in which I run a check on a schedule using Quartz inside of an Quarkus application. The result should be send with AMQP using an emitter.

Unfortunately the emitter can not be injected. I don't really understand that since I already inject the emitter in other classes, where it works fine.

I also inject a ConfigMapping, which works fine in the same class.

I use Java 21 and Quarkus 3.17

Here is the problematic class (working class at the end):

package myproject.scheduling;

import java.util.List;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import myproject.config.ConnectionChecks;
import myproject.config.Constants.AmqpQueues.Producers;
import myproject.config.Constants.Jobs;
import myproject.models.measurements.ConnectionStatus;
import myproject.scheduling.jobs.ConnectionCheckRunner;

@ApplicationScoped
public class Scheduler {

    @Inject
    @Channel(Producers.MONITORING)
    Emitter<String> emitter;

    @Inject
    ConnectionChecks checks;

    @Scheduled(every = "1s", identity = Jobs.CONNECTION_CHECK)
    void scheduleInternetCheck() {
        List<ConnectionStatus> result = new ConnectionCheckRunner(checks).run();
        result.forEach((entry) -> System.out.println(entry.toLineProtocol()));
        result.forEach((entry) -> emitter.send(entry.toLineProtocol()));
    }
}

Here is the error log:

2024-12-01 22:13:47,731 ERROR [io.qua.sch.com.run.StatusEmitterInvoker] (executor-thread-1) Error occurred while executing task for trigger io.quarkus.quartz.runtime.QuartzSchedulerImpl$QuartzTrigger@7f01115e: java.util.concurrent.CompletionException: java.lang.RuntimeException: Error injecting org.eclipse.microprofile.reactive.messaging.Emitter<java.lang.String> myproject.scheduling.Scheduler.emitter
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:874)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357)
    at java.base/java.util.concurrent.CompletableFuture$MinimalStage.whenComplete(CompletableFuture.java:2948)
    at io.quarkus.scheduler.common.runtime.DefaultInvoker.invoke(DefaultInvoker.java:25)
    at io.quarkus.scheduler.common.runtime.DelegateInvoker.invokeDelegate(DelegateInvoker.java:29)
    at io.quarkus.scheduler.common.runtime.StatusEmitterInvoker.invoke(StatusEmitterInvoker.java:35)
    at io.quarkus.scheduler.common.runtime.DelegateInvoker.invokeDelegate(DelegateInvoker.java:29)
    at io.quarkus.scheduler.common.runtime.DelegateInvoker.invokeComplete(DelegateInvoker.java:36)
    at io.quarkus.scheduler.common.runtime.OffloadingInvoker$2.call(OffloadingInvoker.java:54)
    at io.quarkus.scheduler.common.runtime.OffloadingInvoker$2.call(OffloadingInvoker.java:51)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$4(ContextImpl.java:192)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:270)
    at io.vertx.core.impl.ContextImpl$1.execute(ContextImpl.java:221)
    at io.vertx.core.impl.WorkerTask.run(WorkerTask.java:56)
    at io.quarkus.vertx.core.runtime.VertxCoreRecorder$15.runWith(VertxCoreRecorder.java:642)
    at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2675)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2654)
    at org.jboss.threads.EnhancedQueueExecutor.runThreadBody(EnhancedQueueExecutor.java:1627)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1594)
    at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)
    at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: Error injecting org.eclipse.microprofile.reactive.messaging.Emitter<java.lang.String> myproject.scheduling.Scheduler.emitter
    at myproject.scheduling.Scheduler_Bean.doCreate(Unknown Source)
    at myproject.scheduling.Scheduler_Bean.create(Unknown Source)
    at myproject.scheduling.Scheduler_Bean.create(Unknown Source)
    at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
    at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
    at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
    at io.quarkus.arc.generator.Default_jakarta_enterprise_context_ApplicationScoped_ContextInstances.c3(Unknown Source)
    at io.quarkus.arc.generator.Default_jakarta_enterprise_context_ApplicationScoped_ContextInstances.computeIfAbsent(Unknown Source)
    at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
    at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:21)
    at myproject.scheduling.Scheduler_ClientProxy.arc$delegate(Unknown Source)
    at myproject.scheduling.Scheduler_ClientProxy.scheduleInternetCheck(Unknown Source)
    at myproject.scheduling.Scheduler_ScheduledInvoker_scheduleInternetCheck_db9533208f2a6decf3d622882f85ce81424bd3a1.invokeBean(Unknown Source)
    ... 20 more
Caused by: jakarta.enterprise.inject.spi.DefinitionException: SRMSG00019: Unable to connect an emitter with the channel `monitoring-producer`
    at io.smallrye.reactive.messaging.providers.extension.ChannelProducer.getEmitter(ChannelProducer.java:225)
    at io.smallrye.reactive.messaging.providers.extension.ChannelProducer.produceEmitter(ChannelProducer.java:173)
    at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.doCreate(Unknown Source)
    at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.create(Unknown Source)
    at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.get(Unknown Source)
    at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.get(Unknown Source)
    at io.quarkus.arc.impl.CurrentInjectionPointProvider.get(CurrentInjectionPointProvider.java:48)
    ... 33 more

Here is an example where I use emitter and it works:

package myproject.rest;

import java.util.Map;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import myproject.config.Constants.AmqpQueues.Producers;
import myproject.models.measurements.metrics.JobMetric;
import myproject.models.measurements.Data;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;

@Path("data")
@RunOnVirtualThread
public class DataResource extends Resource {

    @Inject
    @Channel(Producers.DATA)
    Emitter<String> emitter;

    @Inject
    @Channel(Producers.MONITORING)
    Emitter<String> metricEmitter;

    @POST
    public Map<String, String> postData(Data data) {
        JobMetric metric = startMetric("api_post_data");
        emitter.send(data.toLineProtocol());
        finishMetric(metric, metricEmitter);
        return getOkResponse("Uploaded data sucessfully");
    }

}

Solution

  • The problem is that Quarkus scheduler extensions (quarkus-scheduler and quarkus-quartz) start before the messaging channels are initialized.

    Quarkus scheduler supports delayed start and conditional execution approach as well.

    Delayed start

    Just set delayed attribute to @Scheduled annotation. I highly recommend to extract messaging logic to another bean.

    @ApplicationScoped
    public class ConnectionCheckTask {
    
        @Inject
        ConnectionChecker checker;
    
        @Scheduled(every = "1s", identity = Jobs.CONNECTION_CHECK, delayed = "20s")
        void scheduleInternetCheck() {
            checker.check();
        }
    }
    
    @ApplicationScoped
    public class ConnectionChecker {
    
        @Inject
        @Channel(Producers.MONITORING)
        Emitter<String> emitter;
    
        void check() {
            new ConnectionCheckRunner(checks).run().stream()
                .peek((entry) -> Log.debugf("Checking: %s", entry.toLineProtocol()))
                .forEach((entry) -> emitter.send(entry.toLineProtocol()));
        }
    }
    

    Guide to delayed start is avaliable here: https://quarkus.io/guides/scheduler-reference#delayed_start

    Conditional exection

    Create a predicate bean and check availability. The following example checks the smallrye-amqp connector availability.

    @ApplicationScoped
    public class ConnectionCheckTask {
    
    
        @Scheduled(every = "1s", skipExecutionIf = AMQPLivenessPredicate.class, identity = Jobs.CONNECTION_CHECK)
        void scheduleInternetCheck() {
            // do magic
        }
    }
    
    @Singleton
    public class AMQPLivenessPredicate implements SkipPredicate {
    
        @Inject
        @Connector(AmqpConnector.CONNECTOR_NAME)
        AmqpConnector amqpConnector;
    
        boolean test(ScheduledExecution execution) {
            return !amqpConnector.getStartup().isOk(); 
        }
    }
    

    BTW, Quarkus has an awesome extension for smallrye-health. Why don't you create a simple health check to monitor your application's internet connection in 5 minutues?
    https://quarkus.io/guides/smallrye-health#creating-your-first-health-check