36

I am looking for a python package that can do multiprocessing not just across different cores within a single computer, but also with a cluster distributed across multiple machines. There are a lot of different python packages for distributed computing, but most seem to require a change in code to run (for example a prefix indicating that the object is on a remote machine). Specifically, I would like something as close as possible to the multiprocessing pool.map function. So, for example, if on a single machine the script is:

from multiprocessing import Pool
pool = Pool(processes = 8)
resultlist = pool.map(function, arglist)

Then the pseudocode for a distributed cluster would be:

from distprocess import Connect, Pool, Cluster

pool1 = Pool(processes = 8)
c = Connect(ipaddress)
pool2 = c.Pool(processes = 4)
cluster = Cluster([pool1, pool2])
resultlist = cluster.map(function, arglist)
Michael
  • 13,244
  • 23
  • 67
  • 115
  • Probably requires more setup than you're looking for, but you can take a look at celery for a distributed task queue. http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html – monkut Nov 12 '14 at 01:11
  • I'd check out [jug](http://luispedro.org/software/jug) – szxk Nov 12 '14 at 03:24
  • I may end up using celery, but it requires a whole lot of set up and the help files are hard to follow (disjointed, rather than clear step by step instructions with a copy of the entire script at the end). Jug's docs talk about parallelizing, but not parallelizing across different computers. – Michael Nov 14 '14 at 21:16
  • There's also a pretty comprehensive list of solutions: https://wiki.python.org/moin/ParallelProcessing – Yibo Yang Jan 29 '18 at 01:09
  • Also worth noting that there is a more recent solution that is similar to `pathos` -- a package called `dask`. – Mike McKerns Nov 15 '18 at 13:11

5 Answers5

22

If you want a very easy solution, there isn't one.

However, there is a solution that has the multiprocessing interface -- pathos -- which has the ability to establish connections to remote servers through a parallel map, and to do multiprocessing.

If you want to have a ssh-tunneled connection, you can do that… or if you are ok with a less secure method, you can do that too.

>>> # establish a ssh tunnel
>>> from pathos.core import connect
>>> tunnel = connect('remote.computer.com', port=1234)
>>> tunnel       
Tunnel('-q -N -L55774:remote.computer.com:1234 remote.computer.com')
>>> tunnel._lport
55774
>>> tunnel._rport
1234
>>> 
>>> # define some function to run in parallel
>>> def sleepy_squared(x):
...   from time import sleep
...   sleep(1.0)
...   return x**2
... 
>>> # build a pool of servers and execute the parallel map
>>> from pathos.pp import ParallelPythonPool as Pool
>>> p = Pool(8, servers=('localhost:55774',))
>>> p.servers
('localhost:55774',)
>>> y = p.map(sleepy_squared, x)
>>> y
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Or, instead you could configure for a direct connection (no ssh)

>>> p = Pool(8, servers=('remote.computer.com:5678',))
# use an asynchronous parallel map
>>> res = p.amap(sleepy_squared, x)
>>> res.get()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

It's all a bit finicky, for the remote server to work, you have to start a server running on remote.computer.com at the specified port beforehand -- and you have to make sure that both the settings on your localhost and the remote host are going to allow either the direct connection or the ssh-tunneled connection. Plus, you need to have the same version of pathos and of the pathos fork of pp running on each host. Also, for ssh, you need to have ssh-agent running to allow password-less login with ssh.

But then, hopefully it all works… if your function code can be transported over to the remote host with dill.source.importable.

FYI, pathos is long overdue a release, and basically, there are a few bugs and interface changes that need to be resolved before a new stable release is cut.

Mike McKerns
  • 33,715
  • 8
  • 119
  • 139
  • 5
    I should mention I'm the `pathos` author. – Mike McKerns Nov 15 '14 at 16:56
  • 2
    I would strongly encourage you to create a detailed setup guide from start to finish such that after running a server.py file on the server and a client.py file on the client, the client can actually access the server and run a job pooled across the client and the server. After reading this answer and your answer to my other question I am still unsure how to (a) setup the server or (b) establish a secure connection to the server. – Michael Nov 17 '14 at 22:29
  • I have no idea how to do (a). Does set up the server just mean have a server that will run python if the SSH authentication is correct? I believe you assume (b) is handled outside of python by openSSH? In the example you provide you seem to make the connection, but then the `tunnel` object is never used again and `remote.computer.com` is not included in the next pool you create. It is referenced in the "instead you could configure for a direct connection (no ssh)" but I really don't understand how that works because without SSH how am I able to authenticate to the server? – Michael Nov 17 '14 at 22:30
  • 1
    When you create a tunnel, the tunnel links a local port to a remote port. Thus, your computer just needs to send all requests to a local port, and the tunnel will pipe it to the remote server for you -- using SSH. You only need SSH to set up the tunnel, so only need to call it **once**. From then on out, you can pipe insecure communication through a secure tunnel by communicating with your own local port. If you are not using a tunnel, they you have to tell the pool to connect to the remote server. Look at some documentation for how a ssh-tunnel works. Pathos just sets one up for you. – Mike McKerns Nov 18 '14 at 07:37
  • If you are using `pp` to talk to the remote server, you need a `ppserver` running on the remote host. If you are using something else (zmq, …) then you need a server of that type running. Pathos does have some code that can start up a server for you on a remote host, but it's not totally robust, in that you need to save the jobid reference to shut it down, or otherwise you need to login and figure out which running job is your server. You can do that remotely with pathos as well, but it's not really something a you would want to get into if you aren't comfortable killing unix processes. – Mike McKerns Nov 18 '14 at 07:47
  • @Michael: I will write something up, in tutorial style, as there is definitely room for improvement in the documentation of use cases. I'll probably link to something on SSH-tunneling (and also setting up remote hosts). If you want to know what the ssh-tunnel is doing, you can easily check the `__repr__` of the tunnel… it shows the command that `pathos` is using under the covers. With that, you can look at the SSH docs to see what the command does. – Mike McKerns Nov 18 '14 at 19:16
  • @MikeMcKerns did you ever end up writing that tutorial? I'm interested in doing distributed map but the documentation leaves a lot to be desired – Jonno_FTW Jan 07 '19 at 23:38
  • Started here: https://github.com/mmckerns/tuthpc, but it's in no way complete yet. – Mike McKerns Jan 07 '19 at 23:59
  • More breadcrumbs for the next person to come across this question. There is a small basic example in the last few blocks of https://github.com/mmckerns/tuthpc/blob/master/multiprocessing.ipynb that shows how to spawn a server and test to see if you can connect and run on it. I was having issues because installing pathos through pip on winpython wasn't installing the ppserver.py script, but I was able to grab it from https://github.com/uqfoundation/ppft/blob/ppft-1.6.6.3/ppft/server/ppserver and get things working, at least locally. Change the tag there to the latest release when you grab it – Scott Mar 31 '21 at 03:10
  • The OP might like to know about the alternative `dask` that @MikeMcKerns himself named in 2018 in his comment https://stackoverflow.com/questions/26876898/python-multiprocessing-with-distributed-cluster?lq=1#comment93520490_26876898 – user2987828 Sep 23 '21 at 13:38
  • @user2987828: yes, dask is a reasonable alternative, and is very similar to pathos family of codes. dask wasn't really an option back when the question was asked, but it is now. – Mike McKerns Sep 24 '21 at 12:01
  • In your example you showed how to connect SSH tunnel, and separately you showed how to use Pool. Can you combine these two parts to show how to create pool over SSH? Right now it is not obvious how to do this in `pathos`. – Arty Jan 30 '22 at 15:17
  • @Arty: the first example shows how to do it. Note that the Pool server is localhost at the same port that the local end of the Tunnel points to. – Mike McKerns Jan 31 '22 at 14:09
  • @MikeMcKerns My appologize, correct, didn't notice that in pool you user localhost as a server, same localhost port that was used as tunnel's local port. – Arty Jan 31 '22 at 14:18
20

I'd suggest taking a look at Ray, which aims to do exactly that.

Ray uses the same syntax to parallelize code in the single machine multicore setting as it does in the distributed setting. If you're willing to use a for loop instead of a map call, then your example would look like the following.

import ray
import time

ray.init()

@ray.remote
def function(x):
    time.sleep(0.1)
    return x

arglist = [1, 2, 3, 4]

result_ids = [function.remote(x) for x in arglist]
resultlist = ray.get(result_ids)

That will run four tasks in parallel using however many cores you have locally. To run the same example on a cluster, the only line that would change would be the call to ray.init(). The relevant documentation can be found here.

Note that I'm helping to develop Ray.

Robert Nishihara
  • 3,276
  • 16
  • 17
12

A little late to the party here, but since I was also looking for a similar solution, and this question is still not marked as answered, I thought I would contribute my findings.

I ended up using SCOOP. It provides a parallel map implementation that can work across multiple cores, across multiple hosts. It can also fall back to Python's serial map function if desired during invocation.

From SCOOP's introduction page, it cites the following features:

SCOOP features and advantages over futures, multiprocessing and similar modules are as follows:

  • Harness the power of multiple computers over network;
  • Ability to spawn multiple tasks inside a task;
  • API compatible with PEP-3148;
  • Parallelizing serial code with only minor modifications;
  • Efficient load-balancing.

It does have some quirks (functions/classes must be pickleable), and the setup to get things running smoothly across multiple hosts can be tedious if they don't all share the same filesystem schema, but overall I'm quite happy with the results. For our purposes, doing quite a bit of Numpy & Cython, it provides excellent performance.

Hope this helps.

Richard
  • 56,349
  • 34
  • 180
  • 251
bazel
  • 299
  • 7
  • 20
  • 1
    `SCOOP` is a less capable but better supported package than `pathos`… but still, it's not a bad choice. As far as I know, `pathos` and `SCOOP` are the only two such packages that provide hierarchical parallel/distributed maps. – Mike McKerns Aug 14 '15 at 11:28
  • Thanks for this suggestion. Can you say a little more / do you have example code for getting multiple nodes running? This link is the closest thing I could find to actually getting it set up and running across multiple machines, but it's woefully inadequate. http://scoop.readthedocs.org/en/0.7/install.html#remote-usage – Michael Aug 14 '15 at 20:56
  • 1
    @MikeMcKerns, I have also seen Apache Spark. Can you explain how that differs from pathos (or SCOOP)? – Michael Aug 14 '15 at 22:54
  • @Michael . Getting multiple nodes setup isn't that tricky. The documentation [here] (http://scoop.readthedocs.org/en/0.7/usage.html#how-to-launch-scoop-programs) covers that, under Hostfile format. However, ensuring that the master(broker) and all remote hosts have exactly the same directory layout, code, dependencies, and access to any external data will help make things easier and save you tons of debugging time. It our case, it made sense to rsync data and code to all worker hosts before launching. The SCOOP community is small, but also helpful as well. – bazel Aug 14 '15 at 23:58
  • We also looked at Spark, Celery, Jug, and a few others. Given the small team of researchers and bare metal we have to work with, we looked for solutions that wouldn't require a different programming paradigm, significant refactoring, nor have some other server process that we need to admin on all the worker hosts. We were simply aiming for the most cost effective (in terms of time) way to parallelize complex code that was already there. – bazel Aug 15 '15 at 00:16
  • Apache spark was originally at Berkley, and is now Apache. It's part of a larger ecosystem, and is very stable, and more broadly supported than `pathos` or `SCOOP`. It provides parallelism on a lot of different backends, however, I don't think it provides hierarchical parallel, like `pathos` and `SCOOP` do. – Mike McKerns Aug 15 '15 at 01:33
  • @bazel: `pahos` was specifically built to provide the same API across parallel and distributed backends, so you don't have to edit your code at all to switch from parallel to distributed, or threads to processors to sockets. The idea is to code with a `map`, `pipe`, etc type of constructs that are available in standard python…. and you don't have to change your code really at all to go to distributed/parallel. `pathos` is probably the easiest in that regard, as that's how it was designed to work. – Mike McKerns Aug 15 '15 at 01:37
  • I believe `SCOOP` has a similar philosophy, but requires slightly more overhead for the parallel programming models. Essentially, both it and `pathos` operate on the idea of setting up and executing an AST. The `pathos`' API is almost identical to that of `multiprocessing` --- but works for any of the backends. – Mike McKerns Aug 15 '15 at 01:41
  • 2
    @MikeMcKerns: We parallelized our code on SCOOP with **literally** two lines of code. One line for the import statement (`from scoop import futures`) and the other replacing Python's built-in serial map with SCOOP's map (`futures.map(func,arraydata)`). Couldn't be easier. – bazel Aug 15 '15 at 02:43
  • @bazel: I know how easy it is with `SCOOP`, and it's not always as simple as the two lines above. That is what a "programming model" does, it makes parallelism transparent. What I'm saying is that `pathos` has the same two line philosophy, but `pathos` has more cases that it can handle where it truly does work in the two lines… where you don't have to change your code at all from serial -- just import and overload the `map` I'm glad you had a good experience with `SCOOP`. It's a good package. If I would have to use something aside from `pathos`, that's what I'd pick. – Mike McKerns Aug 15 '15 at 09:47
  • Would be great if you include full example code of how to compute some function (like squaring numbers) using SCOOP over a cluster of computers. Interesting to see here 1) how to setup cluster 2) how to provide list of workers hosts in the client code 3) how to authenticate by password 4) how to encrypt traffic and how to disable encryption 5) how to tweak number of cores used on each host. Also would be nice to know how fast is this framework compared to others if there are such comparisons anywhere. – Arty Nov 02 '20 at 12:07
0

Quite late but hopefully helpful to others.

Nowadays, one can use mpi4py.futures (introduced in version 3.0.0) and MPIPoolExecutor. See the docs https://mpi4py.readthedocs.io/en/stable/mpi4py.futures.html.

Then your code looks rather similar to what you do with multiprocessing. Your python script process.py would look like

from mpi4py.futures import MPIPoolExecutor

def myfunc(a):
    # do something heavy here
    return a

if __name__ == '__main__':
    # init arglist
    # arglist = [ <...> ]
    with MPIPoolExecutor() as pool:
        resultlist = pool.map(myfunc, arglist)

which is then called in your batch script via e.g.

mpiexec -n <#processes, e.g. 12> python -m mpi4py.futures process.py
Stefan
  • 1,697
  • 15
  • 31
-1

Have you looked to disco?

Features:

  • Map / Reduce paradigm
  • Python programming
  • Distributed shared disk
  • ssh underlaying transport
  • web and console interfaces
  • easy to add/block/delete a node
  • master launch slaves nodes without user intervention
  • slaves nodes are automatically restarted in case of failure
  • nice documentation. Following the Install Guide I was able to launch a 2-machine cluster in a few minutes (the only thing I need to do was creating $DISCO_HOME/root folder in order to connect to the WebUI, I guess due of log file error creation).

A simple example from disco's documentation:

from disco.core import Job, result_iterator

def map(line, params):
    for word in line.split():
        yield word, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

if __name__ == '__main__':
    job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
                    map=map,
                    reduce=reduce)
    for word, count in result_iterator(job.wait(show=True)):
        print(word, count)
asterio gonzalez
  • 1,056
  • 12
  • 12