0

This question has been asked and solved a few times recently but I have quite a specific example...

I have a multiprocessing function that was working absolutely fine in complete isolation yesterday (in an interactive notebook), however, I decided to parameterise so I can call it as part of a larger pipeline & for abstraction/cleaner notebook and now it's only using a single thread instead of 6.

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context
mp.set_start_method('forkserver')


def multiprocess_function(func, iterator, input_data):
    result_list = []

    def append_result(result):
        result_list.append(result)

    with get_context('fork').Pool(processes=6) as pool:
        for i in iterator:
            pool.apply_async(func, args = (i, input_data), callback = append_result)
        pool.close()
        pool.join()

    return result_list
multiprocess_function(count_live, run_weeks, base_df)

My previous version of the code executed differently, instead of a return / call I was using the following at the bottom of the function (which doesn't work at all now I've parameterised - even with the args assigned)

if __name__ == '__main__':
    multiprocess_function()

The function executes fine, just only operates across one thread as per the output in top.

Apologies if this is something incredibly simple - I'm not a programmer, I'm an analyst :)

edit: everything works absolutely fine if I include the if__name__ =='main': etc at the bottom of the function and execute the cell, however, when I do this I have to remove the parameters - maybe just something to do with scoping. If I execute by calling the function, whether it is parameterised or not, it only operates on a single thread.

roastbeeef
  • 1,039
  • 1
  • 12
  • 23
  • 1
    why not use [`starmap`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap) instead of `apply_async`? You **are** calling the same function so it eliminates the use of a `callback` or initialzing a list. It reduces to something like: `result_list = pool.starmap(func, ((i, input_date) for i in iterator))` – Tomerikoo Jan 24 '20 at 12:06
  • 1) Ultimately, you changed too many things at once. Hopefully, you are using version control and can back-track to find out which change broke things. 2) "which doesn't work at all now I've parameterised - even with the args assigned" why not? it should be simple to use your new function in a similar way to the old function 3) If you were using a notebook then presumably it was Jupyter or some other iPython based system. Are you running your new script with iPython still? 4) Consider making a [minimal reproducible example](https://stackoverflow.com/help/minimal-reproducible-example). – FiddleStix Jan 24 '20 at 13:01
  • @FiddleStix gonna change the OP to explain a bit more... – roastbeeef Jan 24 '20 at 13:43
  • 1
    "when I do this I have to remove the parameters" -- The function as shown in the question *cannot* be called without parameters. Are you sure you are running the same version of the function? – MisterMiyagi Jan 24 '20 at 13:52
  • 1
    This might be related to the specific Windows issue where the main part is paramount. See https://stackoverflow.com/questions/20222534/python-multiprocessing-on-windows-if-name-main – Bram Vanroy Jan 24 '20 at 13:53
  • 1
    Does this answer your question? [python multiprocessing on windows, if \_\_name\_\_ == "\_\_main\_\_"](https://stackoverflow.com/questions/20222534/python-multiprocessing-on-windows-if-name-main) – Bram Vanroy Jan 24 '20 at 13:53
  • Does ``count_live`` actually run long enough for the first process to still be alive while the second, third, ... start? Does ``run_weeks`` actually contain more than one item? Why do you set the start method to ``forkserver``, but use a ``fork`` context? – MisterMiyagi Jan 24 '20 at 13:58
  • @roastbeeef: This isn't a [MCVE]; you're saying the `if __name__ == '__main__':` guard doesn't work because of the parameters, but you don't show where any of these parameters come from, so it wouldn't work as written regardless. The answer is to use the guard; [`forkserver` and `spawn` *require* you to use it](https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods). – ShadowRanger Nov 04 '20 at 06:09

2 Answers2

0

You've got two problems:

  1. You're not using an import guard.

  2. You're not setting the default start method inside the import guard.

Between the two of them, you end up telling Python to spawn the forkserver inside the forkserver, which can only cause you grief. Change the structure of your code to:

import pandas as pd
import multiprocessing as mp
from multiprocessing import get_context


def multiprocess_function(func, iterator, input_data):
    result_list = []
    with get_context('fork').Pool(processes=6) as pool:
        for i in iterator:
            pool.apply_async(func, args=(i, input_data), callback=result_list.append)
        pool.close()
        pool.join()

    return result_list

if __name__ == '__main__':
    mp.set_start_method('forkserver')
    multiprocess_function(count_live, run_weeks, base_df)

Since you didn't show where you got count_live, run_weeks and base_df from, I'll just say that for the code as written, they should be defined in the guarded section (since nothing relies on them as a global).

There are other improvements to be made (apply_async is being used in a way that makes me thing you really just wanted to listify the result of pool.imap_unordered, without the explicit loop), but that's fixing the big issues that will wreck use of spawn or forkserver start methods.

ShadowRanger
  • 143,180
  • 12
  • 188
  • 271
-1

using "get_context('spawn') " instead of "get_context('fork')" maybe will solve your problem

  • 2
    Leaving off the `if __name__ == '__main__':` guard is worse for `spawn` than for `fork` (because `spawn` imports the module as non-`__main__` to set up global state similar to how forking works). – ShadowRanger Nov 04 '20 at 06:07