azurekotlinazureservicebusquarkussmallrye

How to connect to Azure ServiceBus in Quarkus


I want to connect to ServiceBus subscription from Quarkus. I found this article, which suggests to use ServiceBusJmsConnectionFactory and I am trying to make it work with smallrye

So far I tried:

import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Produces
import javax.jms.ConnectionFactory

@ApplicationScoped
class ConnectionFactoryBean {
    @Produces
    fun factory(): ConnectionFactory {
        var connectionStringBuilder = ConnectionStringBuilder("Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=NAME;SharedAccessKey=KEY");
        return ServiceBusJmsConnectionFactory(connectionStringBuilder, null)
    }
}

Then in my application.properties:

mp.messaging.incoming.my_topic_name.connector=smallrye-jms

And finally to receive messages:

@Incoming("my_topic_name")
protected suspend fun receiveMyEvent(myEvent: String) {
    //process
}

In terms of versions:

<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>service-bus-jms-connection-factory</artifactId>
  <version>0.0.1</version>
</dependency>
<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-jms</artifactId>
  <version>3.4.0</version>
</dependency>

It is failing with "An API incompatibility" error.

What is the correct way to receive messages using Kotlin/Quarkus from Azure ServiceBus?

UPDATE I also tried to follow approach from Reactive messaging AMQP and Service Bus with Open Liberty For that I dropped 2 listed dependencies, replaced them with

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>

And completely removed ConnectionFactoryBean. For that I face

javax.net.ssl.SSLHandshakeException: Failed to create SSL connection

error. Adding quarkus.tls.trust-all=true or quarkus.ssl.native=true to properties did not work...

And it is still unclear, how to connect to the subscription? Will it be a change to the @Incoming attribute?


Solution

  • You can use Azure SDK libraries (maven artefact - com.azure.messaging.servicebus) to connect to Service Bus subscription.

    For that you will need to create a ServiceBusProcessorClient connect to your namespace using usual connection string (available from the azure portal under access policies), specify topicName and subscriptionName of it and then wait for messages. Here is an example code:

    @ApplicationScoped
    class ServiceBusClient() {
        private val scope = CoroutineScope(SupervisorJob())
    
        fun onStart(@Observes event: StartupEvent) {
            scope.startClient()
        }
        fun onStop(@Observes event: ShutdownEvent) {
            scope.cancel()
        }
    
        private fun CoroutineScope.startClient() = launch {
            val processorClient: ServiceBusProcessorClient = ServiceBusClientBuilder()
                  .connectionString("CONNECTION_STRING")
                  .processor()
                  .topicName("TOPIC_NAME")
                  .subscriptionName("SUBSCRIPTION_NAME")
                  .processMessage { receivedMessageContext -> onMessage(receivedMessageContext) }
                  .processError { errorContext -> onError(errorContext) }
                  .buildProcessorClient()
    
            processorClient.start()
        }
    
        private fun onMessage(context: ServiceBusReceivedMessageContext) {
            //Process message
        }
    
        private fun onError(context: ServiceBusErrorContext) {
            //Handle errors
        }
    }
    

    You could get the content of the message using context.message.applicationProperties["PROPERTY_NAME"], context.message.body or context.message.subject as required.