46

In many functions from scikit-learn implemented user-friendly parallelization. For example in sklearn.cross_validation.cross_val_score you just pass desired number of computational jobs in n_jobs argument. And for PC with multi-core processor it will work very nice. But if I want use such option in high performance cluster (with installed OpenMPI package and using SLURM for resource management) ? As I know sklearn uses joblib for parallelization, which uses multiprocessing. And, as I know (from this, for example, Python multiprocessing within mpi) Python programs parallelized with multiprocessing easy to scale oh whole MPI architecture with mpirun utility. Can I spread computation of sklearn functions on several computational nodes just using mpirun and n_jobs argument?

Community
  • 1
  • 1
user3271237
  • 663
  • 1
  • 8
  • 11
  • 1
    You might want to check dask-sklearn with the distributed scheduler, that can run in a cluster: http://jcrist.github.io/dask-sklearn-part-1.html – dukebody Aug 07 '16 at 10:28
  • @dukebody can you post an example for using the distributed scheduler? The distributed dask examples I've seen involve manually creating workers on each machine and assigning them to the scheduler. I'm not sure I see how this ties in to the dask-sklearn functions. Would I just create the scheduler and workers like here: http://dask.pydata.org/en/doc-test-build/distributed.html then set the default scheduler like in your link (where 10.0.0.3:8786 is the address of the scheduler I created like in the first link)? – IVlad Aug 07 '16 at 11:52
  • Yes. The setup process is exactly as you describe. See http://distributed.readthedocs.io/en/latest/setup.html – MRocklin Aug 07 '16 at 12:29
  • @MRocklin that doesn't seem to work for me. It seems that nothing gets executed on the workers, although they are successfully created. Can you read the answer below and my comments to it and see if you have any ideas please? – IVlad Aug 08 '16 at 18:23

1 Answers1

37

SKLearn manages its parallelism with Joblib. Joblib can swap out the multiprocessing backend for other distributed systems like dask.distributed or IPython Parallel. See this issue on the sklearn github page for details.

Example using Joblib with Dask.distributed

Code taken from the issue page linked above.

from sklearn.externals.joblib import parallel_backend

search = RandomizedSearchCV(model, param_space, cv=10, n_iter=1000, verbose=1)

with parallel_backend('dask', scheduler_host='your_scheduler_host:your_port'):
        search.fit(digits.data, digits.target)

This requires that you set up a dask.distributed scheduler and workers on your cluster. General instructions are available here: http://dask.readthedocs.io/en/latest/setup.html

Example using Joblib with ipyparallel

Code taken from the same issue page.

from sklearn.externals.joblib import Parallel, parallel_backend, register_parallel_backend

from ipyparallel import Client
from ipyparallel.joblib import IPythonParallelBackend

digits = load_digits()

c = Client(profile='myprofile')
print(c.ids)
bview = c.load_balanced_view()

# this is taken from the ipyparallel source code
register_parallel_backend('ipyparallel', lambda : IPythonParallelBackend(view=bview))

...

with parallel_backend('ipyparallel'):
        search.fit(digits.data, digits.target)

Note: in both the above examples, the n_jobs parameter seems to not matter anymore.

Set up dask.distributed with SLURM

For SLURM the easiest way to do this is probably to use the dask-jobqueue project

>>> from dask_jobqueue import SLURMCluster
>>> cluster = SLURMCluster(project='...', queue='...', ...)
>>> cluster.scale(20)

You could also use dask-mpi or any of several other methods mentioned at Dask's setup documentation

Use dask.distributed directly

Alternatively you can set up a dask.distributed or IPyParallel cluster and then use these interfaces directly to parallelize your SKLearn code. Here is an example video of SKLearn and Joblib developer Olivier Grisel, doing exactly that at PyData Berlin: https://youtu.be/Ll6qWDbRTD0?t=1561

Try Dask-ML

You could also try the Dask-ML package, which has a RandomizedSearchCV object that is API compatible with scikit-learn but computationally implemented on top of Dask

https://github.com/dask/dask-ml

pip install dask-ml
Paul Coccoli
  • 546
  • 1
  • 4
  • 16
MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • I'm trying to get the first example working, the one also described here: http://distributed.readthedocs.io/en/latest/joblib.html. I used `dask-ssh` to set up my scheduler and workers. That works fine, if I print the scheduler object I get the right number of cores (`240`). Next, I wrapped the call to the randomizedsearch's `fit` in the `with` statement. If I look in the console window where i executed `dask-ssh`, I see a connection from the node I run the python script in. However, there is no distributed work going on. It doesn't scale, and it doesn't even see the GPUs that the workers have. – IVlad Aug 07 '16 at 20:21
  • 1
    I also tried tinkering with RandomizedSearchCV's `n_jobs` parameter, setting to `-1`, `1`, `100`, `240`. Each value above `20` leads to about the same performance, which makes me think that nothing is actually running on the distributed workers, but on the node I run the python script on (gensim also prints a message that there is no GPU. There is a GPU on the worker nodes, but there isn't one on the node I run the script from). – IVlad Aug 07 '16 at 20:26
  • At this point you're beyond my expertise. You could raise an issue with the joblib maintainers. I've e-mailed one and alerted him to this question, but they're busy people. I've also appended the answer to point to the experimental dask-learn package – MRocklin Aug 08 '16 at 20:16
  • Ok, thanks. I tried dklearn, but unfortunately it just gets stuck for me, seems to never finish. Will keep at it. – IVlad Aug 08 '16 at 22:21
  • Update: also tried `ipyparallel`, same thing I described with `dask`. The workers (engines in ipyparallel) are successfully created, the client sees them, but my grid searches do not run on them. – IVlad Aug 08 '16 at 23:37
  • I took the liberty to edit your answer with working `sklearn` examples, as I figured them out with the help of `sklearn` developers. Please let me know if you're happy with it, in which case I'll award the bounty. – IVlad Aug 10 '16 at 20:05
  • Cool. I'm surprised that you had to call `register_parallel_backend('distributed', DistributedBackend)`. This should already be handled in `distributed.joblib`. Perhaps sklearn is packing along their own version of the joblib library now? – MRocklin Aug 10 '16 at 20:28
  • Yes, it is apparently. That's why you have to import the ones they use, not the one installed on your platform. And that's what made things confusing for me, since all examples were importing the platform joblib, not sklearn's. Ah well, at least it's taken care of. – IVlad Aug 10 '16 at 20:39
  • In the end I hope that the solution ends up working out well for you – MRocklin Aug 10 '16 at 20:48
  • @IVlad, I ran your ipyparallel example above and I can see all 8 workers busy executing. Thank you for providing it. However, when I use RandomizedSearchCV with different model (sklearn_crfsuite) only one worker is active. crf model provides the same methods as other sklearn models, so I'm not sure what's happening. – Kai Oct 23 '17 at 16:26
  • when using ipyparallel do i have to sync the imports? https://stackoverflow.com/questions/33722330/how-to-best-share-static-data-between-ipyparallel-client-and-remote-engines – George Pamfilis Feb 27 '18 at 16:42