0

I have a lot of tasks (independent of each other, represented by some code in Python) that need to be executed. Their execution time varies. I also have limited resources so at most N tasks can be running at the same time. The goal is to finish executing the whole stack of tasks as fast as possible.

It seems that I am looking for some kind of manager that starts new tasks when the resource gets available and collects finished tasks.

  • Are there any already-made solutions or should I code it myself?
  • Are there any caveats that I should keep in mind?
Jeyekomon
  • 2,878
  • 2
  • 27
  • 37
  • Possible duplicate of [Filling a queue and managing multiprocessing in python](https://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python) – tevemadar Nov 27 '19 at 13:21
  • 2
    you can just use `multiprocessing.Pool` – Maxxik CZ Nov 27 '19 at 13:56

2 Answers2

2

as far as I can tell your main would just become:

def main():
    tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with multiprocessing.Pool(POOL_SIZE) as pool:
        pool.map(sleep, tasks)

i.e. you've just reimplemented a pool, but inefficiently (Pool reuses Processes where possible) and in not as safely, Pool goes to lots of effort to cleanup on exceptions

Sam Mason
  • 15,216
  • 1
  • 41
  • 60
  • Well that's embarrassing. Parallel programming is a big mental step - your code literally cannot be more simple and I still have a hard time imagining what's actually happening behind those one or two lines. Still, it helped me a lot, thank you. – Jeyekomon Nov 28 '19 at 19:28
  • @Jeyekomon the joy of open source code is it's all there if you want to [find out](https://github.com/python/cpython/blob/3.8/Lib/multiprocessing/pool.py) :) – Sam Mason Nov 28 '19 at 22:03
0

Here is a simple code snippet that should fit the requirements:

import multiprocessing
import time

POOL_SIZE = 4
STEP = 1


def sleep(seconds: int):
    time.sleep(seconds)


def main():
    tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    pool = [None] * POOL_SIZE

    while tasks or [item for item in pool if item is not None]:
        for i in range(len(pool)):
            if pool[i] is not None and not pool[i].is_alive():
                # Finished task. Clear the resource.
                pool[i] = None

            if pool[i] is None:
                # Free resource. Start new task if any are left.
                if tasks:
                    task = tasks.pop(0)
                    pool[i] = multiprocessing.Process(target=sleep, args=(task,))
                    pool[i].start()

        time.sleep(STEP)


if __name__ == '__main__':
    main()

The manager has a tasks list of arbitrary length, here are tasks for simplicity represented by integers that are being placed as arguments to a sleep function. It also has a pool list, initially empty, representing the available resource.

The manager periodically visits all currently running processes and checks if they are finished or not. It also starts new processes if the resource becomes available. The whole cycle is being repeated until there are no tasks and no currently running processes left. The STEP value is here to save the computing power - you generally don't need to check the running processes every millisecond.

As for the caveats, there are some guidelines that should be kept in mind when using multiprocessing.

Jeyekomon
  • 2,878
  • 2
  • 27
  • 37
  • why aren't you using the suggested `Pool().map` API? – Sam Mason Nov 27 '19 at 15:56
  • @SamMason I just started learning the `multiprocessing` module and all those `Pipe`, `Queue`, `Event`, `Barrier`, `Semaphore`, `Manager`, ... classes are yet a bit too much to comprehend for me. I managed to implement the functionality from scratch but I was still interested if (and how) can this be implemented using the `multiprocessing` classes. That's why I posted this question. – Jeyekomon Nov 27 '19 at 16:59