1

I am trying to send a large amount of messages (tens of millions) to azure using the python azure.storage.queue library however it is taking a very long time to do so. The code I am using is below:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)

messages = [example list of messages]
connectionString = "example connection string"
queueName = "example-queue-name"

queueClient = QueueClient.from_connection_string(connectionString, queueName)
for message in messages:
    queueClient.send_message(message)

Currently it is taking in the region of 3 hours to send around 70,000 messages which is significantly too slow considering the potential number of messages that need to be sent.

I have looked through the documentation to try and find a batch option but none seem to exist: https://learn.microsoft.com/en-us/python/api/azure-storage-queue/azure.storage.queue.queueclient?view=azure-python

I also wondered if anyone had any experience using the asynchio library to speed this process up and could suggest how to use it?

petgeo
  • 63
  • 9

1 Answers1

1

Try this:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)
from concurrent.futures import ProcessPoolExecutor
import time

messages = []

messagesP1 = messages[:len(messages)//2] 
messagesP2 = messages[len(messages)//2:] 

print(len(messagesP1))
print(len(messagesP2))

connectionString = "<conn str>"
queueName = "<queue name>"

queueClient = QueueClient.from_connection_string(connectionString, queueName)

def pushThread(messages):
   for message in messages:
       queueClient.send_message(message)



def callback_function(future):
    print('Callback with the following result', future.result())

tic = time.perf_counter()

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(pushThread, messagesP1)
        future.add_done_callback(callback_function)
        future2 = executor.submit(pushThread, messagesP2)
        while True:
            if(future.running()):
                print("Task 1 running")
            if(future2.running()):
                print("Task 2 running")

            if(future.done() and future2.done()):
                print(future.result(), future2.result())
                break

if __name__ == '__main__':
    main()


toc = time.perf_counter()
    
print(f"spent {toc - tic:0.4f} seconds")

As you can see I split the message array into 2 parts and use 2 tasks to push data into the queue concurrently. Per my test, I have about 800 messages and it spends me 94s to push all messages: enter image description here

But use the way above, it spends me 48s:

enter image description here

Stanley Gong
  • 11,522
  • 1
  • 8
  • 16
  • Hey Stanley, thanks so much for this it looks exactly like what I was after. I have been very busy but plan to try and implement this today or tomorrow and will come back to you with feedback then! – petgeo Nov 12 '20 at 09:18
  • @petgeo, hello , how's going now ? Is my post helful? – Stanley Gong Nov 18 '20 at 01:48
  • Hey Stanley, I tried to implement this solution but when it ran it failed saying that there was no result for future. I then tried to run it without the callback feature and it ran with no error but no messages were actually generated unfortunately. I had to reset to the old version but I can rerun and get more info for you if that is helpful – petgeo Nov 18 '20 at 22:22
  • @petgeo, how's going? I believe you have solved your issue :) – Stanley Gong Nov 25 '20 at 02:19
  • @Stanley Gong have you got a minute.Have similar issue, happy to post another question if you need me to – wwnde Jan 08 '21 at 04:08
  • @wwnde, sure , just let me know the question link – Stanley Gong Jan 08 '21 at 04:20
  • Not yet posted, happy to post if you wish to help. I have a queue trigger, when I go into the in queue and enter the message manually in Azure it fires. However if I use a timer rigger to write message, it does not fire and execute though the message is written. I suspect I am not enforcing the encode_policy like I should. Something you can help? – wwnde Jan 08 '21 at 04:23
  • 1
    @wwnde, sure willing to try and help :) – Stanley Gong Jan 08 '21 at 04:28
  • @Stanley Gong Invited you to chat room – wwnde Jan 08 '21 at 04:36
  • @Stanley Gong https://stackoverflow.com/questions/65624035/timer-trigger-does-not-trigger-queue-but-manual-entry-does-python – wwnde Jan 08 '21 at 05:51
  • 1
    @wwnde, my friend Hury is researching for this.He is skilled at it, no worries. – Stanley Gong Jan 08 '21 at 06:55