1

I'm trying to read messages from Azure ServiceBus Topics using async/await and then forward the content to another application via HTTP. My code is simple:

import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio.async_client import ServiceBusService

bus_service = ServiceBusService(service_namespace=..., shared_access_key_name=..., shared_access_key_value=...)

async def watch(topic_name, subscription_name):
    print('{} started'.format(topic_name))

    message = bus_service.receive_subscription_message(topic_name, subscription_name, peek_lock=False, timeout=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})


async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    asyncio.run(do())

I want to look for messages (forever) from various topics and when a message arrives send the POST. I import the aio package from azure which should work in an async way. After many attempts, the only solution I got is this with while True and setting the timeout=1. This is not what I wanted, I'm doing it sequentially.

azure-servicebus version 0.50.3.

This is my first time with async/await probably I'm missing something...
Any solution/suggestions?

User
  • 806
  • 1
  • 11
  • 28
  • [docs](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/0.50.1/azure.servicebus.control_client.html?highlight=servicebusservice#azure.servicebus.control_client.servicebusservice.ServiceBusService.receive_subscription_message) I'm not familiar with the library but you are calling a method to receive a message. This is a blocking call. You should subscribe and then handle received events. It's somewhere [here](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/0.50.1/azure.servicebus.html#module-azure.servicebus.receive_handler) – Tin Nguyen Jul 29 '20 at 09:02
  • Please use azure-servicebus 7.0.0 to use asyncio https://pypi.org/project/azure-servicebus/ – rakshith91 Dec 18 '20 at 18:01

2 Answers2

2

Here's how you'll do it with the latest major version v7 of servicebus Please take a look a the async samples to send and receive subscription messages https://github.com/Azure/azure-sdk-for-python/blob/04290863fa8963ec525a0b2f4079595287e15d93/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py

import os
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio import ServiceBusClient
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']

async def watch(topic_name, subscription_name):
    async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:
        subscription_receiver = servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=subscription_name,
        )
    async with subscription_receiver:
         message = await subscription_receiver.receive_messages(max_wait_time=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})

async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do())
rakshith91
  • 682
  • 5
  • 13
0

You will have to use the package : azure.servicebus.aio

They have the below modules for async : enter image description here

We will have to use the Receive handler class - it can instantiated with get_receiver() method. With this object you will be able to iterate through the message Asynchronously. Spun up a sample script which does that you could further optimise it :

from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()

        
Receiving = True

#Topic 1 receiver : 
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver =  SubsClient.get_receiver()

#Topic 2 receiver : 
conn_str2= "<>"
name2="Allmessages2"
SubsClient2 = SubscriptionClient.from_connection_string(conn_str2, name2)
receiver2 =  SubsClient2.get_receiver()
    
#obj= SubscriptionClient("svijayservicebus","mytopic1", shared_access_key_name="RootManageSharedAccessKey", shared_access_key_value="ySr+maBCmIRDK4I1aGgkoBl5sNNxJt4HTwINo0FQ/tc=")
async def receive_message_from1():
    await receiver.open()
    print("Opening the Receiver for Topic1")
    async with receiver:
      while(Receiving):
        msgs =  await receiver.fetch_next()
        for m in msgs:
            print("Received the message from topic 1.....")
            print(str(m))
            await m.complete()
       
async def receive_message_from2():
    await receiver2.open()
    print("Opening the Receiver for Topic2")
    async with receiver2:
      while(Receiving):
        msgs =  await receiver2.fetch_next()
        for m in msgs:
            print("Received the message from topic 2.....")
            print(str(m))
            await m.complete()
               



loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
topic2receiver = loop.create_task(receive_message_from2())

I have created two tasks to facilitate the concurrency. You could refer this post to get more clarity on them.

Output : enter image description here

Satya V
  • 3,811
  • 1
  • 6
  • 9
  • I already tried something similar using `azure-servicebus == 7.0.0b4` but I got `ConnectionClose('ErrorCodes.UnknownError: Connection in an unexpected error state.')`, which is the same error I'm getting using your solution. I also copied and pasted one example from Azure official Github and had the same error. – User Aug 03 '20 at 06:01
  • Are you getting the above error after sometime or at the beginning of the execution? – Satya V Aug 03 '20 at 08:59
  • Beginning. The connection string is topic related, right? – User Aug 03 '20 at 09:12
  • Yes that's right. I would be needing few more details like the debug logs. I can also take it up offline for a closer look and provide a quick and specialized assistance, please send an email with subject line “Attn:Sathya” to AzCommunity[at]Microsoft[dot]com referencing this thread along with the code sample you re using and debug log file. Ref : https://learn.microsoft.com/en-us/azure/developer/python/azure-sdk-logging – Satya V Aug 03 '20 at 10:21