0

I'm reviewing some code and noticed some possibly redundant code:

def tasker(val):
   do stuff

def multiprocessor (func, vals):
   chunks = np.array_split(vals, os.cpu_count())
   with multiprocessing.Pool() as pool:
      pool.map(partial(func,vals), chunksize=chunks)

if __name__ == '__main__':
   values = foobar
   p = multiprocessing.Process(target=multiprocessor(tasker,values))
   p.start()
   p.close()
   p.join()

Just for a sanity check - Is running multiprocessing.Process on the multiprocessing.Pool function not redundant? No need to functionalize the multiprocessing.Pool to begin with, correct? Is there any advantage of running it like this?

chicagobeast12
  • 643
  • 1
  • 5
  • 20

1 Answers1

1

As it happens, the Process call never actually does anything useful; target=multiprocessor(tasker,values) is running multiprocessor in the main process, then passing its return value (None, since it has no explicit return) as the target for the Process.

So yes, definitionally, this is completely pointless; you make the Pool in the parent process, run it to completion, then create a no-op Process, launch it, it does nothing, then when the useless Process exits, the main process continues. Unless there is some benefit to creating such a no-op process, the code would do the same thing if the guarded block were just:

if __name__ == '__main__':
   values = foobar
   multiprocessor(tasker, values)

If the Process had been created correctly, with:

p = multiprocessing.Process(target=multiprocessor, args=(tasker, values))

and the code was more complex, there might be some benefit to this, if the Process needed to be killable (you could kill it easily for whatever reason, e.g. because some deadline had passed), or it would allocate huge amounts of memory that must be completely returned to the OS (not merely released to the user-mode free pool for reuse), or you were trying to avoid any mutations of the main process's globals (if the Process's target mutated them, the changes would only be seen in that child process and any processes forked after the change, the parent would not see them changed).

As written, none of these conditions seem to apply (aside from maybe memory growth issues, especially due to the use of partial, which has issues when used as the mapper function with Pool's various map-like methods), but without knowing the contents of tasker (more specifically, what it returns, which Pool.map will collect and dispose of, consuming memory that isn't strictly needed only to free it in bulk at the end), I can't be sure.


An aside:

I'll note your code as written makes no sense:

def multiprocessor (func, vals):
   chunks = np.array_split(vals, os.cpu_count())
   with multiprocessing.Pool() as pool:
      pool.map(partial(func,vals), chunksize=chunks)

doesn't provide an iterable to pool.map, and passed chunks (a list of numpy sub-arrays) as the chunksize, which should be an int.

The additional comments below assume it was actually implemented as:

def multiprocessor (func, vals):
   chunks = np.array_split(vals, os.cpu_count())
   with multiprocessing.Pool() as pool:
      pool.map(func, chunks, chunksize=1)

or:

def multiprocessor (func, vals):
   chunk_size = -(-len(vals) // os.cpu_count())  # Trick to get ceiling division out of floor division operator
   with multiprocessing.Pool() as pool:
      pool.map(func, vals, chunksize=chunk_size)

Having said that, the possible memory issue from Pool.map storing all the results when they're clearly discarded can be ameliorated by using Pool.imap_unordered instead, and just forcing the resulting iterator to run to completion efficiently. For example, you could replace pool.map(func, chunks, chunksize=1) with consume(pool.imap_unordered(func, chunks)) and pool.map(func, vals, chunksize=chunk_size) with consume(pool.imap_unordered(func, vals, chunksize=chunk_size)) (where consume is the itertools recipe of the same name).

In both cases, rather than allocating a list for all the results, storing each result in it as the workers complete tasks (allocating more and more stuff you don't need), imap_unordered produces each result as it's returned, and consume immediately grabs each result and throws it away (memory must be allocated for each result, but it's immediately released, so the peak memory consumption for the process, and therefore the size the heap grows to, is kept minimal).

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
  • Thank you, I couldn't understand why it was built like this and maybe there was actually some advantage that I was missing... but thanks for confirming that it's not necessary – chicagobeast12 Nov 29 '22 at 17:41
  • 1
    @chicagobeast12: I edited in some hypothetical cases where it *might* do something useful if the `Process` was constructed correctly, but yeah, as written, with the incorrectly constructed `Process`, it couldn't possibly derive any of those benefits (since it's not actually doing the work in a separate process). – ShadowRanger Nov 29 '22 at 17:48
  • I didn't mean to add chunksize=chunks in the original post, it should've just been pool.map(partial(func,vals), chunks). The tasker function is actually running a multithreader on a function that's only performing writes... I do appreciate the advice against using partial (I wasn't aware that it pickles the underlying arguments as well - could be a reason as to why I was running into memory issues I've had in the past). Apologies for my ignorance, but are you saying that map is storing 'results' even though my tasker function isn't returning anything? – chicagobeast12 Nov 29 '22 at 18:42
  • 1
    @chicagobeast12: If `tasker` returns nothing explicitly, then `pool.map` will just store a `list` of `None`s matching the number of inputs from the iterable; when all the tasks have finished, it will return said `list`, and since you don't assign it to anything, the `list` is freed at that point. Not a huge deal assuming the number of inputs isn't ridiculous (a `list` of 1M `None`s would only cost 8-10 MB on a 64 bit Python). The concern is that, if the `list` is not just `None`s, building it might allocate a lot of memory that might not be returned to the OS when the `list` goes away. – ShadowRanger Nov 29 '22 at 19:11
  • 1
    Most allocators either never return their small object heaps to the OS, or only do so rarely (e.g. if literally every object in a large chunk of the heap has been freed). So making a bunch of small objects occupying hundreds of MB might mean the process holds on to that memory (reusing it for new allocations, but not giving it back to the OS until the process exits). If the process that performs these allocations is a child process, it prevents the parent process from bloating like that, that's all. It's rarely a big deal, but it comes up now and again. – ShadowRanger Nov 29 '22 at 19:14
  • You said "imap_unordered produces each result as it's returned, and consume immediately grabs each result and throws it away"... does this mean if my tasker was actually returning something, I couldn't use imap/consume because I need to access the returned data? – chicagobeast12 Nov 29 '22 at 19:28
  • 1
    @chicagobeast12: It's `consume` that grabs them and throws them away. If you actually wanted the results, you could write a `for` loop over the iterator returned by `imap_unordered` just fine, and if your loop processes the results, without storing them, it would get similar memory savings (it wouldn't be making a `list` of *all* the results). – ShadowRanger Nov 29 '22 at 19:45
  • @ShadownRanger, your last block of code works as intended. However it was running significantly slower in my case and I found out the problem. Turned out this was only running multiprocessing, whereas my original solution also leverages a multithreading solution ie. pool.map(partial(multithreader, func), chunks). So I need to pass the tasker function to the multithreader and then pass that to the pool. To accomplish this, I was using partial - but if you see a more efficient way without using partial, I'd love to see. Thanks! – chicagobeast12 Dec 06 '22 at 18:14
  • 1
    @chicagobeast12: It's not a great idea to mix threads and processes; CPython barely benefits from CPU bound threads in any event, and if you're already using multiple processes, you may as well just use the multiple processes to split up the work. The cost of `pool.map`ing `partial(multithreader, func)` is going to depend on the cost of serializing `multithreader` and `func`; you might try computing a rough overhead estimate with `len(pickle.dumps((multithreader, func), pickle.HIGHEST_PROTOCOL))`. If it's in the low KB range or below, it's probably fine, but if it's huge, it's a problem. – ShadowRanger Dec 06 '22 at 19:34