I want to connect to ServiceBus subscription from Quarkus. I found this article, which suggests to use ServiceBusJmsConnectionFactory
and I am trying to make it work with smallrye
So far I tried:
import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Produces
import javax.jms.ConnectionFactory
@ApplicationScoped
class ConnectionFactoryBean {
@Produces
fun factory(): ConnectionFactory {
var connectionStringBuilder = ConnectionStringBuilder("Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=NAME;SharedAccessKey=KEY");
return ServiceBusJmsConnectionFactory(connectionStringBuilder, null)
}
}
Then in my application.properties:
mp.messaging.incoming.my_topic_name.connector=smallrye-jms
And finally to receive messages:
@Incoming("my_topic_name")
protected suspend fun receiveMyEvent(myEvent: String) {
//process
}
In terms of versions:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>service-bus-jms-connection-factory</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-jms</artifactId>
<version>3.4.0</version>
</dependency>
It is failing with "An API incompatibility" error.
What is the correct way to receive messages using Kotlin/Quarkus from Azure ServiceBus?
UPDATE I also tried to follow approach from Reactive messaging AMQP and Service Bus with Open Liberty For that I dropped 2 listed dependencies, replaced them with
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
And completely removed ConnectionFactoryBean
.
For that I face
javax.net.ssl.SSLHandshakeException: Failed to create SSL connection
error. Adding quarkus.tls.trust-all=true
or quarkus.ssl.native=true
to properties did not work...
And it is still unclear, how to connect to the subscription? Will it be a change to the @Incoming
attribute?
You can use Azure SDK libraries (maven artefact - com.azure.messaging.servicebus) to connect to Service Bus subscription.
For that you will need to create a ServiceBusProcessorClient
connect to your namespace using usual connection string (available from the azure portal
under access policies), specify topicName
and subscriptionName
of it and then wait for messages. Here is an example code:
@ApplicationScoped
class ServiceBusClient() {
private val scope = CoroutineScope(SupervisorJob())
fun onStart(@Observes event: StartupEvent) {
scope.startClient()
}
fun onStop(@Observes event: ShutdownEvent) {
scope.cancel()
}
private fun CoroutineScope.startClient() = launch {
val processorClient: ServiceBusProcessorClient = ServiceBusClientBuilder()
.connectionString("CONNECTION_STRING")
.processor()
.topicName("TOPIC_NAME")
.subscriptionName("SUBSCRIPTION_NAME")
.processMessage { receivedMessageContext -> onMessage(receivedMessageContext) }
.processError { errorContext -> onError(errorContext) }
.buildProcessorClient()
processorClient.start()
}
private fun onMessage(context: ServiceBusReceivedMessageContext) {
//Process message
}
private fun onError(context: ServiceBusErrorContext) {
//Handle errors
}
}
You could get the content of the message using context.message.applicationProperties["PROPERTY_NAME"]
, context.message.body
or context.message.subject
as required.