javaapache-kafkaquarkussmallrye-reactive-messagingkogito

Error trying to send Kogito end & start messages using Smallrye Kafka in Quarkus


I have created a quarkus app with multiple BPMN's having start messages and end messages. The below is my application.properties for messaging

kafka.bootstrap.servers=0.0.0.0\:9092
mp.messaging.incoming.endSales.auto.offset.reset=earliest
mp.messaging.incoming.endSales.connector=smallrye-kafka
mp.messaging.incoming.endSales.topic=endSales
mp.messaging.incoming.endSales.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.outgoing.kogito-processinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-processinstances-events.topic=kogito-processinstances-events
mp.messaging.outgoing.kogito-processinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito-usertaskinstances-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-usertaskinstances-events.topic=kogito-usertaskinstances-events
mp.messaging.outgoing.kogito-usertaskinstances-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito-variables-events.connector=smallrye-kafka
mp.messaging.outgoing.kogito-variables-events.topic=kogito-variables-events
mp.messaging.outgoing.kogito-variables-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_stream.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_stream.topic=endSales
mp.messaging.outgoing.kogito.outgoing_stream.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_newEnds.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_newEnds.topic=newEnds
mp.messaging.outgoing.kogito_outgoing_newEnds.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kogito_outgoing_sudoku.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_sudoku.topic=sudoku
mp.messaging.outgoing.kogito_outgoing_sudoku.value.serializer=org.apache.kafka.common.serialization.StringSerializer

This throws me the following error during build time

 java.lang.IllegalArgumentException: SRMSG00071: Invalid channel configuration -  the `connector` attribute must be set for channel `kogito`
        at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.lambda$new$0(ConnectorConfig.java:50)
        at java.base/java.util.Optional.orElseThrow(Optional.java:403)
        at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.lambda$new$1(ConnectorConfig.java:50)
        at java.base/java.util.Optional.orElseGet(Optional.java:364)
        at io.smallrye.reactive.messaging.providers.impl.ConnectorConfig.<init>(ConnectorConfig.java:49)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.lambda$extractConfigurationFor$0(ConfiguredChannelFactory.java:85)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.extractConfigurationFor(ConfiguredChannelFactory.java:74)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:101)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass.initialize$$superforward1(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass$$function$$4.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_Subclass.initialize(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:192)

Also are there any channel naming conventions related to kafka topics for Kogito messages in Quarkus?


Solution

  • As explained here, the important thing is that the name of your message name in your bpmn matches the name of the channel in your application.properties As a recommendation use shorter channel names. For example, rather than kogito_outgoing_newEnds, just use newEnds (so you do not need to specify the topic, since the topic is by default equal to channel name) As you will see in the blog, kogito_outgoing_stream is the default channel name (which can also be changed through configuration)