1

Helo, I have the next code that delete a dlq messages from Azure Service Bus Topics:

import asyncio
from azure.servicebus.aio import ServiceBusClient 
from azure.servicebus import ServiceBusSubQueue
from azure.servicebus.management import ServiceBusAdministrationClient
from datetime import datetime
import pytz
import os
from dateutil.relativedelta import relativedelta

NAMESPACE_CONNECTION_STR = os.environ.get('NAMESPACE_CONNECTION_STR')
servicebus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(NAMESPACE_CONNECTION_STR)
TOPIC_NAME = os.environ.get('TOPIC_NAME')
SUBSCRIPTION_NAME = os.environ.get('SUBSCRIPTION_NAME')
DAYS=os.environ.get('DAYS')
TARGET_DATE = datetime.now(tz=pytz.UTC) + relativedelta(hours=-int(DAYS))  # "-2" month old datetime from now. 
TOPIC_INFO_BEFORE = servicebus_mgmt_client.get_subscription_runtime_properties(TOPIC_NAME,SUBSCRIPTION_NAME).dead_letter_message_count
DELETE_COUNTER = 0
TOTAL_PROCESSED = 0

if TOPIC_NAME is None or SUBSCRIPTION_NAME is None:
    raise ValueError('TOPIC_NAME and SUBSCRIPTION_NAME must be passed!')

print(f"All messages that have Enqueued Time less than {TARGET_DATE} will be deleted.")
print(f"Number of dead-letter messages before deletion: {TOPIC_INFO_BEFORE}")


async def dlq_receiver(servicebus_client, topic_name, subscription_name):
    async with servicebus_client:
        global TOTAL_PROCESSED
        global DELETE_COUNTER
        receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, subscription_name=SUBSCRIPTION_NAME, sub_queue=ServiceBusSubQueue.DEAD_LETTER)

        print("EntityPath: " + receiver.entity_path)
        messages = []
        async with receiver:
            while (True):
                try:
                    received_msgs = await receiver.receive_messages(max_wait_time=5, max_message_count=10)
                except asyncio.TimeoutError:
                    print("Timed out while waiting for messages")
                    continue
                TOTAL_PROCESSED += len(received_msgs)
                if len(received_msgs) == 0 or TOTAL_PROCESSED >= TOPIC_INFO_BEFORE:
                    print(f"Finished")
                    return
                for msg in received_msgs:
                    if msg.enqueued_time_utc < TARGET_DATE:

                        messages.append(msg)
                    else:
                        print("Message is not processed due to  EnqueuedDatetime: " + str(msg.enqueued_time_utc) + " sequenceNo.: " + str(msg.sequence_number))

                for mes in messages:
                  print("Processed message EnqueuedDatetime: " + str(mes.enqueued_time_utc) + " sequenceNo.: " + str(mes.sequence_number))
                  DELETE_COUNTER += 1
                  await receiver.complete_message(mes)
                messages.clear()
async def dlq_multiple_client(connection_string, topic_name, subscription_name):
    # Can increase this count to run more receiver clients.
    concurrent_receivers = 5

    client = ServiceBusClient.from_connection_string(connection_string)

    receiver_clients = [dlq_receiver(client, topic_name, subscription_name) for _ in range(concurrent_receivers)]
    await asyncio.gather(*receiver_clients)


if __name__ == '__main__':
    asyncio.run(dlq_multiple_client(NAMESPACE_CONNECTION_STR, TOPIC_NAME, SUBSCRIPTION_NAME))
    print(f"Clean up is completed! Total count of deleted messages: {DELETE_COUNTER}")
    TOPIC_INFO_AFTER = servicebus_mgmt_client.get_subscription_runtime_properties(TOPIC_NAME,SUBSCRIPTION_NAME).dead_letter_message_count
    print(f"Messages left in Dead Letter Queue: {TOPIC_INFO_AFTER}")

Sometimes it executes successfully but sometimes at the end I receive an error:

"Traceback (most recent call last):
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\main.py", line 66, in \<module\>
asyncio.run(dlq_multiple_client(NAMESPACE_CONNECTION_STR, TOPIC_NAME, SUBSCRIPTION_NAME))
File "C:\\Users\\Davyd_Derkach\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\\Users\\Davyd_Derkach\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 647, in run_until_complete
return future.result()
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\main.py", line 62, in dlq_multiple_client
await asyncio.gather(\*receiver_clients)
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\main.py", line 41, in dlq_receiver
return
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\venv\\lib\\site-packages\\azure\\servicebus\\aio_servicebus_client_async.py", line 136, in __aexit__
await self.close()
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\venv\\lib\\site-packages\\azure\\servicebus\\aio_servicebus_client_async.py", line 224, in close
for handler in self.\_handlers:
File "C:\\Users\\Davyd_Derkach\\AppData\\Local\\Programs\\Python\\Python39\\lib_weakrefset.py", line 65, in __iter__
for itemref in self.data:
RuntimeError: Set changed size during iteration"

I noticed that it happen sometime when I change count concurrent_receivers = 5 or change max_wait_time or max_message_count in (max_wait_time=5, max_message_count=10).

May someone help with this?

From suggestion that I found in Internet I added like a copy of my set to iterate over it:

                for mes in messages:
                  print("Processed message EnqueuedDatetime: " + str(mes.enqueued_time_utc) + sequenceNo.: " + str(mes.sequence_number))
                  DELETE_COUNTER += 1
                  await receiver.complete_message(mes)
                  messages.clear()
  • Welcome to [so]. We don't provide debugging services. The following references give advice on debugging your code. [How to debug small programs](https://ericlippert.com/2014/03/05/how-to-debug-small-programs/), [Six Debugging Techniques for Python Programmers](https://medium.com/techtofreedom/six-debugging-techniques-for-python-programmers-cb25a4baaf4b) or [Ultimate Guide to Python Debugging](https://towardsdatascience.com/ultimate-guide-to-python-debugging-854dea731e1b) – itprorh66 May 05 '23 at 14:27

2 Answers2

0

I have tested to delete the message by two type of methods mostly, It workable and try in your environment.

  • PeekAsyncmethod - This need to specify SequenceNumber as referred from link.

Below that I have tried as the message received to the servicebus.

received_msgs = receiver.receive_messages(max_message_count=1, max_wait_time=5)

for msg in received_msgs:
print("Received message: " + str(msg))
receiver.complete_message(msg)

As followed from this reference. By using the delete method on the Message object, the second step of the receive process is finished. The message is marked as eaten and is deleted from the subscription using the delete method.

msg = bus_service.receive_subscription_message('mytopic', 'LowMessages', peek_lock=True)  
print(msg.body)  
  
msg.delete()

enter image description here

  • Manual deletion using Service Bus Explorer.

Connect your Azure service bus> File> Connect> Enter Connection string> Received messages> Select the peek in receive mode and select the delete count up to your extent as referred from link.

Suresh Chikkam
  • 623
  • 2
  • 2
  • 6
0

Note that for some reason the delete of a DLQ message in my Topics only works when the batch size = 1. Using a loop solved the problem.

Mixie
  • 1
  • 3
  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Aug 28 '23 at 19:22