0

I'm using ThreadPoolExecutor to run several simultaneous queries of an API simultaneously. I'd like to return the results as they become available and perform some action on them. I found this post which details the various methods for handling returned results from ThreadPoolExecutor quite helpful. However, it seems like my code is still waiting for all of the API queries to finish before it performs the subsequent actions I'm requesting on the returns from the API. While my code works, I feel like I'm missing something. Is this the best way, or even the correct way to structure this?

Here is an example of my code:

futures_lst = []
with ThreadPoolExecutor(max_workers=6) as executor:
  for i in df.index:
    future = executor.submit(api_get_function, single_api_input_variable)
    futures_lst.append(future)

for future in concurrent.futures.as_completed(futures_lst):
  # First run a function to format the results returned by the API
  result_df = format_columns_in(future.result())
  # Append the formatted results to a csv file as we go
  result_df.to_csv('result.csv', mode='a', index=True, header=True)
bengen343
  • 139
  • 1
  • 2
  • 8

1 Answers1

0

You using with statement, that's why you will be blocked until all futures completed. with is special statement that is come with __enter__ and __exit__ methods. __enter__ will be called when you enter the with block, and __exit__ method will be called automatically(by interpreter) end of the with block.

When you look at the source code of ThreadPoolExecutor class, you will see this __exit__ function (inherited from Executor class) :

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False

And when we look at the description of shutdown method, it says

If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed

That's why your code "is still waiting for all of the API queries to finish before it performs the subsequent actions".

To solve this, you have some option. First option is, you can remove the with statement. By doing this, you will not be blocked

futures_lst = []
executor = ThreadPoolExecutor(max_workers=6)
for i in df.index:
    future = executor.submit(api_get_function, single_api_input_variable)
    futures_lst.append(future)

for future in concurrent.futures.as_completed(futures_lst):
  # First run a function to format the results returned by the API
  result_df = format_columns_in(future.result())
  # Append the formatted results to a csv file as we go
  result_df.to_csv('result.csv', mode='a', index=True, header=True)

But above option is not concurrent, because you looping over futures as they completed, but what if two of them are completed at the same time ?

If you want to do this operation concurrently(not parallel) you can use below option also:

def custom_callback(future):
    result_df = format_columns_in(future.result())
    # Append the formatted results to a csv file as we go
    result_df.to_csv('result.csv', mode='a', index=True, header=True)


with ThreadPoolExecutor(max_workers=6) as executor:
    for i in df.index:
        future = executor.submit(api_get_function, single_api_input_variable)
        future.add_done_callback(custom_callback)

This solution is concurrent, if any thread finish it's job, then it will call the custom_callback function

But this option is not thread safe, because you appending to file and I don't know what format_columns_in is do. result_df.to_csv require an extra locking(format_columns_in can also require locking accoring to its operation). You can do this option thread-safe like ;

Appending to file can be also thread safe, to more detail please look at this

def custom_callback(future):
    
    result_df = format_columns_in(future.result())
    # Append the formatted results to a csv file as we go
    with global_lock:
        result_df.to_csv('result.csv', mode='a', index=True, header=True)


with ThreadPoolExecutor(max_workers=6) as executor:
    for i in df.index:
        future = executor.submit(api_get_function, single_api_input_variable)
        future.add_done_callback(custom_callback)

For more info about add_done_callback() method, you can see it from doc

Veysel Olgun
  • 552
  • 1
  • 3
  • 15