Helo, I have the next code that delete a dlq messages from Azure Service Bus Topics:
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusSubQueue
from azure.servicebus.management import ServiceBusAdministrationClient
from datetime import datetime
import pytz
import os
from dateutil.relativedelta import relativedelta
NAMESPACE_CONNECTION_STR = os.environ.get('NAMESPACE_CONNECTION_STR')
servicebus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(NAMESPACE_CONNECTION_STR)
TOPIC_NAME = os.environ.get('TOPIC_NAME')
SUBSCRIPTION_NAME = os.environ.get('SUBSCRIPTION_NAME')
DAYS=os.environ.get('DAYS')
TARGET_DATE = datetime.now(tz=pytz.UTC) + relativedelta(hours=-int(DAYS)) # "-2" month old datetime from now.
TOPIC_INFO_BEFORE = servicebus_mgmt_client.get_subscription_runtime_properties(TOPIC_NAME,SUBSCRIPTION_NAME).dead_letter_message_count
DELETE_COUNTER = 0
TOTAL_PROCESSED = 0
if TOPIC_NAME is None or SUBSCRIPTION_NAME is None:
raise ValueError('TOPIC_NAME and SUBSCRIPTION_NAME must be passed!')
print(f"All messages that have Enqueued Time less than {TARGET_DATE} will be deleted.")
print(f"Number of dead-letter messages before deletion: {TOPIC_INFO_BEFORE}")
async def dlq_receiver(servicebus_client, topic_name, subscription_name):
async with servicebus_client:
global TOTAL_PROCESSED
global DELETE_COUNTER
receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, subscription_name=SUBSCRIPTION_NAME, sub_queue=ServiceBusSubQueue.DEAD_LETTER)
print("EntityPath: " + receiver.entity_path)
messages = []
async with receiver:
while (True):
try:
received_msgs = await receiver.receive_messages(max_wait_time=5, max_message_count=10)
except asyncio.TimeoutError:
print("Timed out while waiting for messages")
continue
TOTAL_PROCESSED += len(received_msgs)
if len(received_msgs) == 0 or TOTAL_PROCESSED >= TOPIC_INFO_BEFORE:
print(f"Finished")
return
for msg in received_msgs:
if msg.enqueued_time_utc < TARGET_DATE:
messages.append(msg)
else:
print("Message is not processed due to EnqueuedDatetime: " + str(msg.enqueued_time_utc) + " sequenceNo.: " + str(msg.sequence_number))
for mes in messages:
print("Processed message EnqueuedDatetime: " + str(mes.enqueued_time_utc) + " sequenceNo.: " + str(mes.sequence_number))
DELETE_COUNTER += 1
await receiver.complete_message(mes)
messages.clear()
async def dlq_multiple_client(connection_string, topic_name, subscription_name):
# Can increase this count to run more receiver clients.
concurrent_receivers = 5
client = ServiceBusClient.from_connection_string(connection_string)
receiver_clients = [dlq_receiver(client, topic_name, subscription_name) for _ in range(concurrent_receivers)]
await asyncio.gather(*receiver_clients)
if __name__ == '__main__':
asyncio.run(dlq_multiple_client(NAMESPACE_CONNECTION_STR, TOPIC_NAME, SUBSCRIPTION_NAME))
print(f"Clean up is completed! Total count of deleted messages: {DELETE_COUNTER}")
TOPIC_INFO_AFTER = servicebus_mgmt_client.get_subscription_runtime_properties(TOPIC_NAME,SUBSCRIPTION_NAME).dead_letter_message_count
print(f"Messages left in Dead Letter Queue: {TOPIC_INFO_AFTER}")
Sometimes it executes successfully but sometimes at the end I receive an error:
"Traceback (most recent call last):
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\main.py", line 66, in \<module\>
asyncio.run(dlq_multiple_client(NAMESPACE_CONNECTION_STR, TOPIC_NAME, SUBSCRIPTION_NAME))
File "C:\\Users\\Davyd_Derkach\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\\Users\\Davyd_Derkach\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 647, in run_until_complete
return future.result()
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\main.py", line 62, in dlq_multiple_client
await asyncio.gather(\*receiver_clients)
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\main.py", line 41, in dlq_receiver
return
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\venv\\lib\\site-packages\\azure\\servicebus\\aio_servicebus_client_async.py", line 136, in __aexit__
await self.close()
File "C:\\Users\\Davyd_Derkach\\PycharmProjects\\pythonProject\\venv\\lib\\site-packages\\azure\\servicebus\\aio_servicebus_client_async.py", line 224, in close
for handler in self.\_handlers:
File "C:\\Users\\Davyd_Derkach\\AppData\\Local\\Programs\\Python\\Python39\\lib_weakrefset.py", line 65, in __iter__
for itemref in self.data:
RuntimeError: Set changed size during iteration"
I noticed that it happen sometime when I change count concurrent_receivers = 5
or change max_wait_time or max_message_count in (max_wait_time=5, max_message_count=10).
May someone help with this?
From suggestion that I found in Internet I added like a copy of my set to iterate over it:
for mes in messages:
print("Processed message EnqueuedDatetime: " + str(mes.enqueued_time_utc) + sequenceNo.: " + str(mes.sequence_number))
DELETE_COUNTER += 1
await receiver.complete_message(mes)
messages.clear()