12

I am trying to parallelize an embarrassingly parallel for loop (previously asked here) and settled on this implementation that fit my parameters:

    with Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets, train_size_common, feat_sel_size, train_perc,
                                            total_test_samples, num_classes, num_features, label_set,
                                            method_names, pos_class_index, out_results_dir, exhaustive_search])
        partial_func_holdout = partial(holdout_trial_compare_datasets, *shared_inputs)

        with Pool(processes=num_procs) as pool:
            cv_results = pool.map(partial_func_holdout, range(num_repetitions))

The reason I need to use a proxy object (shared between processes) is the first element in the shared proxy list datasets that is a list of large objects (each about 200-300MB). This datasets list usually has 5-25 elements. I typically need to run this program on a HPC cluster.

Here is the question, when I run this program with 32 processes and 50GB of memory (num_repetitions=200, with datasets being a list of 10 objects, each 250MB), I do not see a speedup even by factor of 16 (with 32 parallel processes). I do not understand why - any clues? Any obvious mistakes, or bad choices? Where can I improve this implementation? Any alternatives?

I am sure this has been discussed before, and the reasons can be varied and very specific to implementation - hence I request you to provide me your 2 cents. Thanks.

Update: I did some profiling with cProfile to get a better idea - here is some info, sorted by cumulative time.

In [19]: p.sort_stats('cumulative').print_stats(50)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: cumulative time
   List reduced from 4510 to 50 due to restriction <50>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    897/1    0.044    0.000  662.202  662.202 {built-in method builtins.exec}
        1    0.000    0.000  662.202  662.202 test_rhst.py:2(<module>)
        1    0.001    0.001  661.341  661.341 test_rhst.py:70(test_chance_classifier_binary)
        1    0.000    0.000  661.336  661.336 /Users/Reddy/dev/neuropredict/neuropredict/rhst.py:677(run)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:533(wait)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:263(wait)
       23  661.233   28.749  661.233   28.749 {method 'acquire' of '_thread.lock' objects}
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:261(map)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:637(get)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:634(wait)
    866/8    0.004    0.000    0.868    0.108 <frozen importlib._bootstrap>:958(_find_and_load)
    866/8    0.003    0.000    0.867    0.108 <frozen importlib._bootstrap>:931(_find_and_load_unlocked)
    720/8    0.003    0.000    0.865    0.108 <frozen importlib._bootstrap>:641(_load_unlocked)
    596/8    0.002    0.000    0.865    0.108 <frozen importlib._bootstrap_external>:672(exec_module)
   1017/8    0.001    0.000    0.863    0.108 <frozen importlib._bootstrap>:197(_call_with_frames_removed)
   522/51    0.001    0.000    0.765    0.015 {built-in method builtins.__import__}

The profiling info now sorted by time

In [20]: p.sort_stats('time').print_stats(20)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: internal time
   List reduced from 4510 to 20 due to restriction <20>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       23  661.233   28.749  661.233   28.749 {method 'acquire' of '_thread.lock' objects}
   115/80    0.177    0.002    0.211    0.003 {built-in method _imp.create_dynamic}
      595    0.072    0.000    0.072    0.000 {built-in method marshal.loads}
        1    0.045    0.045    0.045    0.045 {method 'acquire' of '_multiprocessing.SemLock' objects}
    897/1    0.044    0.000  662.202  662.202 {built-in method builtins.exec}
        3    0.042    0.014    0.042    0.014 {method 'read' of '_io.BufferedReader' objects}
2037/1974    0.037    0.000    0.082    0.000 {built-in method builtins.__build_class__}
      286    0.022    0.000    0.061    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:12(docformat)
     2886    0.021    0.000    0.021    0.000 {built-in method posix.stat}
       79    0.016    0.000    0.016    0.000 {built-in method posix.read}
      597    0.013    0.000    0.021    0.000 <frozen importlib._bootstrap_external>:830(get_data)
      276    0.011    0.000    0.013    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/sre_compile.py:250(_optimize_charset)
      108    0.011    0.000    0.038    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py:626(_construct_argparser)
     1225    0.011    0.000    0.050    0.000 <frozen importlib._bootstrap_external>:1233(find_spec)
     7179    0.009    0.000    0.009    0.000 {method 'splitlines' of 'str' objects}
       33    0.008    0.000    0.008    0.000 {built-in method posix.waitpid}
      283    0.008    0.000    0.015    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:128(indentcount_lines)
        3    0.008    0.003    0.008    0.003 {method 'poll' of 'select.poll' objects}
     7178    0.008    0.000    0.008    0.000 {method 'expandtabs' of 'str' objects}
      597    0.007    0.000    0.007    0.000 {method 'read' of '_io.FileIO' objects}

More profiling info sorted by percall info: profiling info sorted by percall

Update 2

The elements in the large list datasets I mentioned earlier are not usually as big - they are typically 10-25MB each. But depending on the floating point precision used, number of samples and features, this can easily grow to 500MB-1GB per element also. hence I'd prefer a solution that can scale.

Update 3:

The code inside holdout_trial_compare_datasets uses method GridSearchCV of scikit-learn, which internally uses joblib library if we set n_jobs > 1 (or whenever we even set it). This might lead to some bad interactions between multiprocessing and joblib. So trying another config where I do not set n_jobs at all (which should to default no parallelism within scikit-learn). Will keep you posted.

3 Answers3

2

Based on discussion in the comments, I did a mini experiment, compared three versions of implementation:

  • v1: basically as same as your approach, in fact, as partial(f1, *shared_inputs) will unpack proxy_manager.list immediately, Manager.List not involved here, data passed to worker with the internal queue of Pool.
  • v2: v2 made use Manager.List, work function will receive a ListProxy object, it fetches shared data via a internal connection to a server process.
  • v3: child process share data from the parent, take advantage of fork(2) system call.

def f1(*args):
    for e in args[0]: pow(e, 2)

def f2(*args):
    for e in args[0][0]: pow(e, 2)

def f3(n):
    for i in datasets: pow(i, 2)

def v1(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f1, *shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v2(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f2, shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v3(np):
    with mp.Pool(processes=np) as pool:
        r = pool.map(f3, range(16))

datasets = [2.0 for _ in range(10 * 1000 * 1000)]
for f in (v1, v2, v3):
    print(f.__code__.co_name)
    for np in (2, 4, 8, 16):
        s = time()
        f(np)
        print("%s %.2fs" % (np, time()-s))

results taken on a 16 core E5-2682 VPC, it is obvious that v3 scales better: result

georgexsh
  • 15,984
  • 2
  • 37
  • 62
  • Thanks George - really helps me get closer to understanding the problem. I am really curious about `*shared_inputs` unpacking immediately - I didn't realize that happens..similar thing would happen when we apply `functools.partial` also, as it tries the freeze the inputs and signature, right? So with or without unpacking, data must have been passed to the child processes via pickling, right? that means, the only we can avoid it is using them as global variables and accessing them from child processes? Any dangers with this method? No changes to input will be made within child processes. – Pradeep Reddy Raamana Oct 25 '17 at 01:15
  • as `*shared_inputs` unpacked list before `partial` take effect, it "freezes" objects that (proxy) list contains, not the proxy itself, v2 it an example to use `Manager.List` with partial; yes data got pickle/unpickle to passing around; within the context of your problem and workload, there is nothing to worry about, if you passing something associated with external resources like socket object, file descriptor, you need to be more careful. – georgexsh Oct 25 '17 at 08:40
  • thanks George for your help - I feel like we need to better establish the bottlenecks .. esp with large objects (10 elements of 20MB) and complex calculations (something more demanding than squaring a regular grids of numbers, as that might trigger internal memoizations).. Your comments have been greatly helpful and I am hoping to nail this. – Pradeep Reddy Raamana Oct 29 '17 at 17:12
0
{method 'acquire' of '_thread.lock' objects}

Looking at your profiler output I would say that the shared object lock/unlock overhead overwhelms the speed gains of multithreading.

Refactor so that the work is farmed out to workers that do not need to talk to one another as much.

Specifically, if possible, derive one answer per data pile and then act on the accumulated results.

This is why Queues can seem so much faster: they involve a type of work that does not require an object that has to be 'managed' and so locked/unlocked.

Only 'manage' things that absolutely need to be shared between processes. Your managed list contains some very complicated looking objects...

A faster paradigm is:

allwork = manager.list([a, b,c])
theresult = manager.list()

and then

while mywork:
    unitofwork = allwork.pop()
    theresult = myfunction(unitofwork)
  • Thanks for the suggestions and response. In fact, the work in different processes do NOT need to talk to each other all i.e. once they access the shared list of data `shared_inputs`, everything inside unit of work is independent of others. – Pradeep Reddy Raamana Oct 24 '17 at 02:23
  • And I am not sure how my implemented suggestion different from your suggestions. Could you try elaborating more, or perhaps write a more detailed implementation that can be swapped in place of what I show at the top? that'd be great, thank you. – Pradeep Reddy Raamana Oct 24 '17 at 02:24
0

If you do not need a complex shared object, then only use a list of the most simple objects imaginable.

Then tell the workers to acquire the complex data that they can process in their own little world.

Try:

allwork = manager.list([datasetid1, datasetid2 ,...])
theresult = manager.list()

while mywork:
    unitofworkid = allwork.pop()
    theresult = myfunction(unitofworkid)

def myfunction(unitofworkid):
    thework = acquiredataset(unitofworkid)
    result = holdout_trial_compare_datasets(thework, ...)

I hope that this makes sense. It should not take too much time to refactor in this direction. And you should see that {method 'acquire' of '_thread.lock' objects} number drop like a rock when you profile.

  • Thanks, I need to think about it. This would lead to tremendous I/O in my situation - but need to think about if it’s only very small fraction of time compared to actual parallel computation.. – Pradeep Reddy Raamana Oct 24 '17 at 12:17