You using with
statement, that's why you will be blocked until all futures completed. with
is special statement that is come with __enter__
and __exit__
methods. __enter__
will be called when you enter the with
block, and __exit__
method will be called automatically(by interpreter) end of the with block.
When you look at the source code of ThreadPoolExecutor
class, you will see this __exit__
function (inherited from Executor
class) :
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
And when we look at the description of shutdown method, it says
If wait is True then this method will not return until all the pending
futures are done executing and the resources associated with the
executor have been freed
That's why your code "is still waiting for all of the API queries to finish before it performs the subsequent actions".
To solve this, you have some option. First option is, you can remove the with
statement. By doing this, you will not be blocked
futures_lst = []
executor = ThreadPoolExecutor(max_workers=6)
for i in df.index:
future = executor.submit(api_get_function, single_api_input_variable)
futures_lst.append(future)
for future in concurrent.futures.as_completed(futures_lst):
# First run a function to format the results returned by the API
result_df = format_columns_in(future.result())
# Append the formatted results to a csv file as we go
result_df.to_csv('result.csv', mode='a', index=True, header=True)
But above option is not concurrent, because you looping over futures as they completed, but what if two of them are completed at the same time ?
If you want to do this operation concurrently(not parallel) you can use below option also:
def custom_callback(future):
result_df = format_columns_in(future.result())
# Append the formatted results to a csv file as we go
result_df.to_csv('result.csv', mode='a', index=True, header=True)
with ThreadPoolExecutor(max_workers=6) as executor:
for i in df.index:
future = executor.submit(api_get_function, single_api_input_variable)
future.add_done_callback(custom_callback)
This solution is concurrent, if any thread finish it's job, then it will call the custom_callback
function
But this option is not thread safe, because you appending to file and I don't know what format_columns_in
is do. result_df.to_csv
require an extra locking(format_columns_in
can also require locking accoring to its operation). You can do this option thread-safe like ;
Appending to file can be also thread safe, to more detail please look at this
def custom_callback(future):
result_df = format_columns_in(future.result())
# Append the formatted results to a csv file as we go
with global_lock:
result_df.to_csv('result.csv', mode='a', index=True, header=True)
with ThreadPoolExecutor(max_workers=6) as executor:
for i in df.index:
future = executor.submit(api_get_function, single_api_input_variable)
future.add_done_callback(custom_callback)
For more info about add_done_callback()
method, you can see it from doc