1

I am using SCOOP (and Python 3.6 - cannot be updated) in my work. I need all workers to perform a computation, then wait for the root node to execute a slow computation (the code in a if __name__ == '__main__':), then perform another computation with a dataframe resulting from the root node computation.

My problem is that SCOOP initiates all workers right away and they try to run all code outside the if __name__ == '__main__': asynchronously, even if it is below the if block. Since there is no dataframe yet, they throw an error.

What command can force all workers to wait for the root worker to complete a computation before continuing to run the rest of the code?


I have tried experimenting with scoop.futures.map, scoop.futures.supply and multiprocessing.managers without success. I have also tried using multiprocessing.Barrier(8).wait() but it does not work.

There is a scoop.futures.wait(futures) method but I do not know how to get the futures argument...

I have something like:

import pandas as pd
import genetic_algorithm
from scoop import futures

df = pd.read_csv('database.csv') # dataframe is to large to be passed to fitness_function for every worker. I want every worker to have a copy of it!

if __name__ == '__main__':
    df = add_new_columns(df) # heavy computation which I just want to perform once (not by all workers)

df = computation_using_new_columns(df) # <--- !!! error - is executed before slow add_new_columns(df) finishes

def fitness_function(): ... # all workers use fitness_function() and an error is thrown if I put it inside the if __name__ == '__main__':

if __name__ == '__main__':
    results = list(futures.map(genetic_algorithm, df))

and execute the script with python3 -m scoop script.py which starts all workers right away...

João Bravo
  • 206
  • 4
  • 16
  • 1
    maybe make a [minimal reproducible example](https://stackoverflow.com/help/minimal-reproducible-example) of the problem ? the current state of the question cannot be answered without knowing what each of those functions do, still they should all be inside the `if __name__ == "__main__:"` clause – Ahmed AEK Jan 25 '23 at 17:06
  • @AhmedAEK I think I need them out of the `if` because, if I put them all inside, it throws an error that functions defined inside the `if` are not defined for some workers. – João Bravo Jan 25 '23 at 17:10
  • it's hard to tell from your description of the problem what should and shouldn't be done, you need a minimal reproducible example. – Ahmed AEK Jan 25 '23 at 17:11
  • done something more explicit, it is a complex genetic algorithm so it is hard to reproduce – João Bravo Jan 25 '23 at 17:23
  • 1
    looking into the `SCOOP` framework i don't see any tools to make this work, perhaps you could use more flexible tools like python's builtin `concurrent.futures` module, i could write an answer for that. – Ahmed AEK Jan 25 '23 at 17:34
  • If it works, that would be just fine – João Bravo Jan 25 '23 at 17:35

1 Answers1

1

each process has its own memory space, modifying the dataframe in the main process doesn't affect the workers, you need to pass it to the workers using some sort of initializer after it is processed, which doesn't seem to be available in the SCOOP framework, a more flexible (but slightly more complicated) tool would be python's builtin multiprocessing.Pool module.

import pandas as pd
import genetic_algorithm
from multiprocessing import Pool

def fitness_function(): ...

def initializer_func(df_from_parent):
    global df
    df = df_from_parent
    df = computation_using_new_columns(df)

if __name__ == '__main__':
    df = pd.read_csv(
        'database.csv')  
    # read the df in the main process only as it needs to be modified
    # before sending it to the workers

    df = add_new_columns(df)  # modify the df in the main process
    # create as much workers as your cpu cores, and passes the df to them, and have each worker
    # execute the computation_using_new_columns on it
    with Pool(initializer=initializer_func, initargs=(df,)) as pool:
        results = list(pool.imap(genetic_algorithm, df))  # now do your function

if computation_using_new_columns needs to execute in each worker then you can keep it in the initializer, but if it only needs to execute once then you can put it after add_new_columns inside the if __name__ == "__main__".

Ahmed AEK
  • 8,584
  • 2
  • 7
  • 23
  • Thank you! I actually have two points at which I invoke the pool.map parallelization method. Should I open a `with ProcessPoolExecutor(initializer=initializer_func, initargs=(df,)) as pool:` block before each call or should I just put all the code in between the pool.map's in the `with` block? In other words, is it inefficient to open a pool twice? – João Bravo Jan 27 '23 at 10:25
  • 1
    @JoãoBravo no you shouldn't open the pool twice, i'd recommed you instead save it to a variable and reuse it in both places `pool = ProcessPoolExecutor(initializer=initializer_func, initargs=(df,))` and just let python figure out the lifetimes, or just put all the code inside the with block. – Ahmed AEK Jan 27 '23 at 10:29
  • Got it! Meanwhile, python is throwing this error: `Traceback (most recent call last):` `File "script.py", line 512, in ` `with ProcessPoolExecutor(initializer=get_dataframe_for_GA, initargs=(df,)) as pool:` `TypeError: __init__() got an unexpected keyword argument 'initializer'` – João Bravo Jan 27 '23 at 10:57
  • the method signature is just `ProcessPoolExecutor(max_workers=None)` – João Bravo Jan 27 '23 at 10:59
  • @JoãoBravo you are most likely using a very old version of python (> 5 years old), can you update it to at least `3.7` ? Otherwise you'll have to implement one ... – Ahmed AEK Jan 27 '23 at 11:14
  • It is an environment with Python 3.6 related to my work which cannot be updated :( How would I implement one? – João Bravo Jan 27 '23 at 11:24
  • 1
    @JoãoBravo First i updated the answer to use multiprocessing.Pool instead, see if it works for you. – Ahmed AEK Jan 27 '23 at 11:27
  • Thank you very much. It is fully working! I've marked you as correct. – João Bravo Jan 27 '23 at 11:54