I have a class in which I run a check on a schedule using Quartz inside of an Quarkus application. The result should be send with AMQP using an emitter.
Unfortunately the emitter can not be injected. I don't really understand that since I already inject the emitter in other classes, where it works fine.
I also inject a ConfigMapping, which works fine in the same class.
I use Java 21 and Quarkus 3.17
Here is the problematic class (working class at the end):
package myproject.scheduling;
import java.util.List;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import myproject.config.ConnectionChecks;
import myproject.config.Constants.AmqpQueues.Producers;
import myproject.config.Constants.Jobs;
import myproject.models.measurements.ConnectionStatus;
import myproject.scheduling.jobs.ConnectionCheckRunner;
@ApplicationScoped
public class Scheduler {
@Inject
@Channel(Producers.MONITORING)
Emitter<String> emitter;
@Inject
ConnectionChecks checks;
@Scheduled(every = "1s", identity = Jobs.CONNECTION_CHECK)
void scheduleInternetCheck() {
List<ConnectionStatus> result = new ConnectionCheckRunner(checks).run();
result.forEach((entry) -> System.out.println(entry.toLineProtocol()));
result.forEach((entry) -> emitter.send(entry.toLineProtocol()));
}
}
Here is the error log:
2024-12-01 22:13:47,731 ERROR [io.qua.sch.com.run.StatusEmitterInvoker] (executor-thread-1) Error occurred while executing task for trigger io.quarkus.quartz.runtime.QuartzSchedulerImpl$QuartzTrigger@7f01115e: java.util.concurrent.CompletionException: java.lang.RuntimeException: Error injecting org.eclipse.microprofile.reactive.messaging.Emitter<java.lang.String> myproject.scheduling.Scheduler.emitter
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:874)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357)
at java.base/java.util.concurrent.CompletableFuture$MinimalStage.whenComplete(CompletableFuture.java:2948)
at io.quarkus.scheduler.common.runtime.DefaultInvoker.invoke(DefaultInvoker.java:25)
at io.quarkus.scheduler.common.runtime.DelegateInvoker.invokeDelegate(DelegateInvoker.java:29)
at io.quarkus.scheduler.common.runtime.StatusEmitterInvoker.invoke(StatusEmitterInvoker.java:35)
at io.quarkus.scheduler.common.runtime.DelegateInvoker.invokeDelegate(DelegateInvoker.java:29)
at io.quarkus.scheduler.common.runtime.DelegateInvoker.invokeComplete(DelegateInvoker.java:36)
at io.quarkus.scheduler.common.runtime.OffloadingInvoker$2.call(OffloadingInvoker.java:54)
at io.quarkus.scheduler.common.runtime.OffloadingInvoker$2.call(OffloadingInvoker.java:51)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$4(ContextImpl.java:192)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:270)
at io.vertx.core.impl.ContextImpl$1.execute(ContextImpl.java:221)
at io.vertx.core.impl.WorkerTask.run(WorkerTask.java:56)
at io.quarkus.vertx.core.runtime.VertxCoreRecorder$15.runWith(VertxCoreRecorder.java:642)
at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2675)
at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2654)
at org.jboss.threads.EnhancedQueueExecutor.runThreadBody(EnhancedQueueExecutor.java:1627)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1594)
at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)
at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: Error injecting org.eclipse.microprofile.reactive.messaging.Emitter<java.lang.String> myproject.scheduling.Scheduler.emitter
at myproject.scheduling.Scheduler_Bean.doCreate(Unknown Source)
at myproject.scheduling.Scheduler_Bean.create(Unknown Source)
at myproject.scheduling.Scheduler_Bean.create(Unknown Source)
at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
at io.quarkus.arc.generator.Default_jakarta_enterprise_context_ApplicationScoped_ContextInstances.c3(Unknown Source)
at io.quarkus.arc.generator.Default_jakarta_enterprise_context_ApplicationScoped_ContextInstances.computeIfAbsent(Unknown Source)
at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:21)
at myproject.scheduling.Scheduler_ClientProxy.arc$delegate(Unknown Source)
at myproject.scheduling.Scheduler_ClientProxy.scheduleInternetCheck(Unknown Source)
at myproject.scheduling.Scheduler_ScheduledInvoker_scheduleInternetCheck_db9533208f2a6decf3d622882f85ce81424bd3a1.invokeBean(Unknown Source)
... 20 more
Caused by: jakarta.enterprise.inject.spi.DefinitionException: SRMSG00019: Unable to connect an emitter with the channel `monitoring-producer`
at io.smallrye.reactive.messaging.providers.extension.ChannelProducer.getEmitter(ChannelProducer.java:225)
at io.smallrye.reactive.messaging.providers.extension.ChannelProducer.produceEmitter(ChannelProducer.java:173)
at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.doCreate(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.create(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.get(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.ChannelProducer_ProducerMethod_produceEmitter_jRfYFqSs8A2Ams_fdlkwttyVQ3w_Bean.get(Unknown Source)
at io.quarkus.arc.impl.CurrentInjectionPointProvider.get(CurrentInjectionPointProvider.java:48)
... 33 more
Here is an example where I use emitter and it works:
package myproject.rest;
import java.util.Map;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import myproject.config.Constants.AmqpQueues.Producers;
import myproject.models.measurements.metrics.JobMetric;
import myproject.models.measurements.Data;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
@Path("data")
@RunOnVirtualThread
public class DataResource extends Resource {
@Inject
@Channel(Producers.DATA)
Emitter<String> emitter;
@Inject
@Channel(Producers.MONITORING)
Emitter<String> metricEmitter;
@POST
public Map<String, String> postData(Data data) {
JobMetric metric = startMetric("api_post_data");
emitter.send(data.toLineProtocol());
finishMetric(metric, metricEmitter);
return getOkResponse("Uploaded data sucessfully");
}
}
The problem is that Quarkus scheduler extensions (quarkus-scheduler
and quarkus-quartz
) start before the messaging channels are initialized.
Quarkus scheduler supports delayed start and conditional execution approach as well.
Just set delayed
attribute to @Scheduled
annotation.
I highly recommend to extract messaging logic to another bean.
@ApplicationScoped
public class ConnectionCheckTask {
@Inject
ConnectionChecker checker;
@Scheduled(every = "1s", identity = Jobs.CONNECTION_CHECK, delayed = "20s")
void scheduleInternetCheck() {
checker.check();
}
}
@ApplicationScoped
public class ConnectionChecker {
@Inject
@Channel(Producers.MONITORING)
Emitter<String> emitter;
void check() {
new ConnectionCheckRunner(checks).run().stream()
.peek((entry) -> Log.debugf("Checking: %s", entry.toLineProtocol()))
.forEach((entry) -> emitter.send(entry.toLineProtocol()));
}
}
Guide to delayed start is avaliable here: https://quarkus.io/guides/scheduler-reference#delayed_start
Create a predicate bean and check availability. The following example checks the smallrye-amqp
connector availability.
@ApplicationScoped
public class ConnectionCheckTask {
@Scheduled(every = "1s", skipExecutionIf = AMQPLivenessPredicate.class, identity = Jobs.CONNECTION_CHECK)
void scheduleInternetCheck() {
// do magic
}
}
@Singleton
public class AMQPLivenessPredicate implements SkipPredicate {
@Inject
@Connector(AmqpConnector.CONNECTOR_NAME)
AmqpConnector amqpConnector;
boolean test(ScheduledExecution execution) {
return !amqpConnector.getStartup().isOk();
}
}
BTW, Quarkus has an awesome extension for smallrye-health
. Why don't you create a simple health check to monitor your application's internet connection in 5 minutues?
https://quarkus.io/guides/smallrye-health#creating-your-first-health-check