2

I want to connect to ServiceBus subscription from Quarkus. I found this article, which suggests to use ServiceBusJmsConnectionFactory and I am trying to make it work with smallrye

So far I tried:

import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Produces
import javax.jms.ConnectionFactory

@ApplicationScoped
class ConnectionFactoryBean {
    @Produces
    fun factory(): ConnectionFactory {
        var connectionStringBuilder = ConnectionStringBuilder("Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=NAME;SharedAccessKey=KEY");
        return ServiceBusJmsConnectionFactory(connectionStringBuilder, null)
    }
}

Then in my application.properties:

mp.messaging.incoming.my_topic_name.connector=smallrye-jms

And finally to receive messages:

@Incoming("my_topic_name")
protected suspend fun receiveMyEvent(myEvent: String) {
    //process
}

In terms of versions:

<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>service-bus-jms-connection-factory</artifactId>
  <version>0.0.1</version>
</dependency>
<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-jms</artifactId>
  <version>3.4.0</version>
</dependency>

It is failing with "An API incompatibility" error.

What is the correct way to receive messages using Kotlin/Quarkus from Azure ServiceBus?

UPDATE I also tried to follow approach from Reactive messaging AMQP and Service Bus with Open Liberty For that I dropped 2 listed dependencies, replaced them with

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>

And completely removed ConnectionFactoryBean. For that I face

javax.net.ssl.SSLHandshakeException: Failed to create SSL connection

error. Adding quarkus.tls.trust-all=true or quarkus.ssl.native=true to properties did not work...

And it is still unclear, how to connect to the subscription? Will it be a change to the @Incoming attribute?

JoeBloggs
  • 89
  • 7
  • 1. please add an example connection string (obfuscated version) of your connection string. with all the "parts" specified (and obfuscated). 2. Please add your (non "*") IMPORT statements. (3) add the packages you are referencing and their versions. – granadaCoder Oct 05 '22 at 14:47
  • @granadaCoder thanks for the comment, I updated the question with details and also outlined one other approach that I tested – JoeBloggs Oct 05 '22 at 15:54
  • So "ssl" is more of a "baked in" thing with service bus. not a "addon" like with http(s). I am guessing, but I think you may need a certificate. Again, I'm intelligently guessing. Maybe read this: https://learn.microsoft.com/en-us/azure/app-service/configure-ssl-certificate?tabs=apex%2Cportal – granadaCoder Oct 06 '22 at 11:53
  • actually, without `mp.messaging.incoming.tenant-manager.use-ssl=true` connection works. But I cannot receive any messages. I guess I need to setup address properly somehow for the topic subscription, but not sure how – JoeBloggs Oct 06 '22 at 12:28

1 Answers1

3

You can use Azure SDK libraries (maven artefact - com.azure.messaging.servicebus) to connect to Service Bus subscription.

For that you will need to create a ServiceBusProcessorClient connect to your namespace using usual connection string (available from the azure portal under access policies), specify topicName and subscriptionName of it and then wait for messages. Here is an example code:

@ApplicationScoped
class ServiceBusClient() {
    private val scope = CoroutineScope(SupervisorJob())

    fun onStart(@Observes event: StartupEvent) {
        scope.startClient()
    }
    fun onStop(@Observes event: ShutdownEvent) {
        scope.cancel()
    }

    private fun CoroutineScope.startClient() = launch {
        val processorClient: ServiceBusProcessorClient = ServiceBusClientBuilder()
              .connectionString("CONNECTION_STRING")
              .processor()
              .topicName("TOPIC_NAME")
              .subscriptionName("SUBSCRIPTION_NAME")
              .processMessage { receivedMessageContext -> onMessage(receivedMessageContext) }
              .processError { errorContext -> onError(errorContext) }
              .buildProcessorClient()

        processorClient.start()
    }

    private fun onMessage(context: ServiceBusReceivedMessageContext) {
        //Process message
    }

    private fun onError(context: ServiceBusErrorContext) {
        //Handle errors
    }
}

You could get the content of the message using context.message.applicationProperties["PROPERTY_NAME"], context.message.body or context.message.subject as required.

JleruOHeP
  • 10,106
  • 3
  • 45
  • 71
  • can u plz support https://stackoverflow.com/questions/74733613/how-to-connect-quarkus-application-with-azure-share-file – Catalina Dec 08 '22 at 18:32