The answer by dnswlt works well but can still be improved. If the request to the REST API (or whatever else should be done with each record) take a variable amount of time, some CPUs may be idle while the slowest request of each batch is running.
The following solution takes a generator and a function as an input and applies the function to each element produced by the generator while maintaining a given number of running threads (each of which applies the function to one element). At the same time, it still returns the results in the order of the input.
from concurrent.futures import ThreadPoolExecutor
import os
import random
import time
def map_async(iterable, func, max_workers=os.cpu_count()):
# Generator that applies func to the input using max_workers concurrent jobs
def async_iterator():
iterator = iter(iterable)
pending_results = []
has_input = True
thread_pool = ThreadPoolExecutor(max_workers)
while True:
# Submit jobs for remaining input until max_worker jobs are running
while has_input and \
len([e for e in pending_results if e.running()]) \
< max_workers:
try:
e = next(iterator)
print('Submitting task...')
pending_results.append(thread_pool.submit(func, e))
except StopIteration:
print('Submitted all task.')
has_input = False
# If there are no pending results, the generator is done
if not pending_results:
return
# If the oldest job is done, return its value
if pending_results[0].done():
yield pending_results.pop(0).result()
# Otherwise, yield the CPU, then continue starting new jobs
else:
time.sleep(.01)
return async_iterator()
def example_generator():
for i in range(20):
print('Creating task', i)
yield i
def do_work(i):
print('Starting to work on', i)
time.sleep(random.uniform(0, 3))
print('Done with', i)
return i
random.seed(42)
for i in map_async(example_generator(), do_work):
print('Got result:', i)
The commented output of a possible execution (on a machine with 8 logical CPUs):
Creating task 0
Submitting task...
Starting to work on 0
Creating task 1
Submitting task...
Starting to work on 1
Creating task 2
Submitting task...
Starting to work on 2
Creating task 3
Submitting task...
Starting to work on 3
Creating task 4
Submitting task...
Starting to work on 4
Creating task 5
Submitting task...
Starting to work on 5
Creating task 6
Submitting task...
Starting to work on 6
Creating task 7
Submitting task...
Starting to work on 7 # This point is reached quickly: 8 jobs are started before any of them finishes
Done with 1 # Job 1 is done, but since job 0 is not, the result is not returned yet
Creating task 8 # Job 1 finished, so a new job can be started
Submitting task...
Creating task 9
Starting to work on 8
Submitting task...
Done with 7
Starting to work on 9
Done with 9
Creating task 10
Submitting task...
Creating task 11
Starting to work on 10
Submitting task...
Done with 3
Starting to work on 11
Done with 2
Creating task 12
Submitting task...
Creating task 13
Starting to work on 12
Submitting task...
Done with 12
Starting to work on 13
Done with 10
Creating task 14
Submitting task...
Creating task 15
Starting to work on 14
Submitting task...
Done with 8
Starting to work on 15
Done with 13 # Several other jobs are started and completed
Creating task 16
Submitting task...
Creating task 17
Starting to work on 16
Submitting task...
Done with 0 # Finally, job 0 is completed
Starting to work on 17
Got result: 0
Got result: 1
Got result: 2
Got result: 3 # The result of all completed jobs are returned in input order until the job of the next one is still running
Done with 5
Creating task 18
Submitting task...
Creating task 19
Starting to work on 18
Submitting task...
Done with 16
Starting to work on 19
Done with 11
Submitted all task.
Done with 19
Done with 4
Got result: 4
Got result: 5
Done with 6
Got result: 6 # Job 6 must have been a very long job; now that it's done, its result and the result of many subsequent jobs can be returned
Got result: 7
Got result: 8
Got result: 9
Got result: 10
Got result: 11
Got result: 12
Got result: 13
Done with 14
Got result: 14
Done with 15
Got result: 15
Got result: 16
Done with 17
Got result: 17
Done with 18
Got result: 18
Got result: 19
The above run took about 4.7s while the sequential execution (setting max_workers=1
) took about 23.6s. Without the optimization that avoids waiting for the slowest execution per batch, the execution takes about 5.3s. Depending on the variation of the individual job times and max_workers
, the effect of the optimization may be even larger.