0

Is there any map-like method which doesn't load all sub-processes in memory at once, instead, if total CPU threads is four, it firstly load four process and execute it, if one of four finishes, it will load another one and replace this.

The pool.map in standard library just loads all of jobs at once, and execute them in random order. If there was a large amount of jobs to execute, the memory will overflow.

I have read the official documentation of Python3 , I don't find any related material yet.

The feature I required is too detailed, I don't think there is any third-part library which implement this feature on purpose.

What I expected:

There is four core computer.

y = XXX.map(f,range(1,100))

if 1~4 doesn't finish, there is no f(5) in system memory. When one of these four tasks finishes, for example f(2), it will load f(5) at the position of f(2).

Let's talk about the feature of function 'f'. f is a heavy memory consuming function, its instance have to take up about huge amount of memory.

davmos
  • 103
  • 3
  • Checkout imap in multiprocessing module – SimonF Jan 05 '19 at 09:49
  • Possible duplicate of [Using multiprocessing.Process with a maximum number of simultaneous processes](https://stackoverflow.com/questions/20886565/using-multiprocessing-process-with-a-maximum-number-of-simultaneous-processes) – Ulrich Eckhardt Jan 05 '19 at 11:01
  • @UlrichEckhardt, I think they are different, please see what I added in the post. – davmos Jan 05 '19 at 20:07
  • @SimonF, according to official document, the different between 'imap' and 'map' is " For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1." . So I don't think it is what I am finding. – davmos Jan 05 '19 at 20:11
  • @davmos I think you are missunderstanding the documententation, just as I missunderstand your question. Read this link for the difference of map/imap: https://stackoverflow.com/questions/26520781/multiprocessing-pool-whats-the-difference-between-map-async-and-imap – SimonF Jan 06 '19 at 08:35
  • So to be clear, you only want to perform 4 calculations at once and then the first process that finished grabs the next one? – SimonF Jan 06 '19 at 08:37

2 Answers2

0

First of all the concept of map having all subprocesses in memory is incorrect, map has the entire iterable (input) in memory by turning it into a list. However it only has the number of workers (pool) you create, see example.

If the problem is that the iterable is long and consumes a lot of memory by itself then imap is a better choice as it doesn't keep the whole iterable in memory, it just takes the next one and hands it to a worker. An added benefit is that imap returns the results directly (but in order), so that it can be used by the main process.

After one worker completes it's task the memory is released as can be verified with the example code because of the time delays.

Example:

import multiprocessing
import random
import time

def func(x):
    """
    1. Prints the process and input
    2. Waits a bit
    3. Uses a lot of memory
    4. Waits a random amount more
    """
    print(f'{multiprocessing.current_process()}: {x}')
    time.sleep(5)
    a = list(range(10000000))
    time.sleep(5 + random.randint(0, 5))

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    pool.map(func, range(10))

Output:

<ForkProcess(ForkPoolWorker-1, started daemon)>: 0
<ForkProcess(ForkPoolWorker-2, started daemon)>: 1
<ForkProcess(ForkPoolWorker-3, started daemon)>: 2
<ForkProcess(ForkPoolWorker-4, started daemon)>: 3
<ForkProcess(ForkPoolWorker-2, started daemon)>: 4
<ForkProcess(ForkPoolWorker-4, started daemon)>: 5
<ForkProcess(ForkPoolWorker-3, started daemon)>: 6
<ForkProcess(ForkPoolWorker-1, started daemon)>: 7
<ForkProcess(ForkPoolWorker-4, started daemon)>: 8
<ForkProcess(ForkPoolWorker-3, started daemon)>: 9
SimonF
  • 1,855
  • 10
  • 23
-1

I'm borrowing from Treddy's answer here:

You just need to edit the pool processes value. For example, like this:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

In this example, there are 10 jobs to be done, but it maxes out at 4 processes. If you left the processes value blank, it would use as much resources as were available.

Does this help?

Daniel Scott
  • 979
  • 7
  • 16
  • 1
    I'm trying to figure why my answer was downvoted. I'll work on improving my answers in the future. – Daniel Scott Jan 05 '19 at 09:47
  • 1
    I could imagine that it was because it only repeats an existing answer. The proper reaction for duplicate questions is to close them as duplicate, though that option requires a certain reputation score. – Ulrich Eckhardt Jan 05 '19 at 11:02
  • Thanks Ulrich. I'll keep that in mind for when I can mark as duplicate. Cheers. – Daniel Scott Jan 05 '19 at 11:12