1
import threading
from azure.storage.blob import BlockBlobService

def do_other_stuff():
    print("so much stuff to do")

class ABlob:
    def __init__(self, account_name, account_key, container_name, blob_name, file_path):
        self.account_name = account_name
        self.account_key = account_key
        self.container_name = container_name
        self.blob_name = blob_name
        self.file_path = file_path
        self.blob_service = BlockBlobService(account_name=self.account_name, account_key=self.account_key)

    def get_blob(self):
        download_thread = threading.Thread(
            target=self.blob_service.get_blob_to_path,
            args=(self.container_name, self.blob_name, self.file_path))
        download_thread.start()

    def get_blob_name(self):
        print(self.blob_name)


first_blob = ABlob(account_name='account_name',
                   account_key='key',
                   container_name='container', blob_name='something.csv',
                   file_path='path')


first_blob.get_blob()
first_blob.get_blob_name()
do_other_stuff()

I have Azure Blobs that need to download and upload(not shown). I do not want to wait for them to complete their process as I have other things that should be done. At some point though, I will need to confirm if they have successfully downloaded or uploaded.

With my current code, I have used the threading library. If an error happens in the upload or download process, the thread handling the transaction will exit with an error. I have no way to inform the main thread of completion and the status of the completion.

What do I need to do to be able to get the status of get_blob? Is there a another library that has a less dangerous way of handling this situation? I have referenced the following threads but cannot figure out how to combine their different approaches.

Catch a thread's exception in the caller thread in Python

python multiprocessing pool retries

How to call a async function contained in a class

background function in Python

supertommy
  • 351
  • 3
  • 11

1 Answers1

1

What do I need to do to be able to get the status of get_blob?

You can wrap get_blob in a function that will store information about whether it succeeded, and store the return value if any. Instead of target=self.blob_service.get_blob_to_path, you can write target=self._get_blob_background. The new _get_blob_background method can call self.result = self.blob_service.get_blob_to_path and use try and except Exception as e to catch all exceptions and, in case of exception, execute self.result_exception = e, so that the main thread can distinguish the result from the exception.

Even better, you can use the concurrent.futures library to do all that for you:

pool = concurrent.futures.ThreadPoolExecutor()

def get_blob(self):
    return pool.submit(self.blob_service.get_blob_to_path,
                       self.container_name, self.blob_name, self.file_path)

Now get_blob() will run in the background, like in your code, but here it will return a Future object that you can query to find out whether the job completed, and how it completed.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • I had considered your first approach but thought it may be too sloppy. The `concurrent_futures` library is definitely what I was looking for. Is there a consequence to having one pool object per instances of my class? Or should the pool be at a higher level than a per instance basis. – supertommy May 02 '18 at 15:59
  • @supertommy An executor maintains a pool of threads or processes, spawning and destroying them on-demand. If you create an executor for each instance, you will be responsible for calling `close()` on the executor to ensure that its threads are deleted. Also, a new thread will be spawned on each call, which a thread pool is normally designed to prevent to improve efficiency. – user4815162342 May 03 '18 at 04:38