0

Each function (func1, etc) makes a request to a different url:

def thread_map(ID):
    func_switch = \
        {
            0: func1,
            1: func2,
            2: func3,
            3: func4
        }

    with ThreadPoolExecutor(max_workers=len(func_switch)) as threads:
        futures = [threads.submit(func_switch[i], ID) for i in func_switch]
        results = [f.result() for f in as_completed(futures)]

        for df in results:
            if not df.empty and df['x'][0] != '':
                return df

        return pd.DataFrame()

This is much faster (1.75 sec) compared to a for loop (4 sec), but the results are unordered.

  • How can each function be executed parallely while allowing to check the results by order of execution?

Preferably as background processes/threads returning the according dataframes starting with func1. So if the conditions for func1 are not met, check func2 and so on for the criteria given the results have already been fetched in the background. Each dataframe is different, but they all contain the same common column x.

Any suggestions are highly appreciated plus I hope ThreadPoolExecutor is appropriate for this scenario. Thanks!

Ava Barbilla
  • 968
  • 2
  • 18
  • 37
  • You can use the [map method](https://stackoverflow.com/questions/20838162/how-does-threadpoolexecutor-map-differ-from-threadpoolexecutor-submit) to return results in the order started rather than the order completed. – DarrylG May 18 '20 at 19:03
  • @DarrylG I consider the map method to have two issues. The first one, although minor, is that if the function being called takes something other than one argument, you then need to use something like a *lambda function* or a *partial*. The seconds one, is that if any of the threads throw an exception, you will not be able to obtain the results of any subsequent threads once you retrieve the results from the exception-throwing thread. – Booboo May 18 '20 at 19:19

1 Answers1

1

First, let's do it as you are asking:

with ThreadPoolExecutor(max_workers=len(func_switch)) as threads:
    futures = [threads.submit(func_switch[i], ID) for i in func_switch]
results = [f.result() for f in futures]

That was simple enough.

To process the futures as they are completed and place the results in the list in the futures were created, you need to associate with each future the order in which the future was created:

futures = {} # this time a dictionary
creation_order = 0
with ThreadPoolExecutor(max_workers=len(func_switch)) as threads:
    for i in func_switch:
        future = threads.submit(func_switch[i], ID)
        futures[future] = creation_order # map the future to this value or any other values you want, such as the arguments being passed to the function, which happens to be the creation order
        creation_order += 1
results = [None] * creation_order # preallocate results
for f in as_completed(futures):
    result = f.result()
    index = futures[f] # recover original creation_order:
    results[index] = result

Of course, if you are waiting for all the futures to complete before you do anything with them, there is no point in using the as_completed method. I just wanted to show how if that weren't the case the method for associating the completed future back with the original creation order (or perhaps more useful, the original arguments used in the call to the worker function that created the future). An alternative is for the processing function to return the passed arguments as part of its result.

Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Thank you @Booboo works like a charm the results are ordered. +1 for the preallocation makes sense – Ava Barbilla May 18 '20 at 19:50
  • 1
    And now I am able to follow your code better (I must have had a mental lapse before). I am still not sure why `func_switch` is not a simple list rather than a dictionary. – Booboo May 18 '20 at 20:00