2

I recently wrote a program with a class for my research, and I've attempted to parallelize it. When I've used Python 2.7's multiprocessing.Process with a JoinableQueue and managed data, my program eventually hangs with defunct processes.

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _calc_parallel(self, index):
        self._calc_bond(index)

    def run(self):
        for ts, force in itertools.izip(self.coortrj, self.forcevec):
        try:
            consumers = [mp.Process(target=self._calc_parallel,
                         args=(force,)) for i in range(nprocs)]
            for w in consumers:
                w.start()

            # Enqueue jobs
            for i in range(self.totalsites):
                self.tasks.put(i)

            # Add a poison pill for each consumer
            for i in range(nprocs):
                self.tasks.put(None)

            self.tasks.close()
            self.tasks.join()

    #       for w in consumers:
    #           w.join()
        except:
            traceback.print_exc()

_calc_parallel calls some other class methods.

I have even tried to use multiprocessing.Pool for this purpose using the copy_reg option as found elsewhere on http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods.

import multiprocessing as mp
import traceback

class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.Pool(processes=nprocs, maxtasksperchild=2)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.map_async(self._calc_parallel, args)
            pool.close()
            pool.join()
        except:
            traceback.print_exc()

However, the pool.map_async does not seem to call self._calc_parallel. I know in both cases (Process and Pool), I'm overlooking something, but I'm not exactly clear as to what. I am processing typically over 40,000 elements.

Thanks for the help.

Update

After reading over several other posts, I also tried pathos.multiprocessing.

import pathos.multiprocessing as mp
class Paramfit(object):
    def __init__(self):
        pass

    def _calc_bond(self, index):
        # Calculate data

    def _use_force(force):
        # Calculate data

    def _calc_parallel(self, index, force):
        self._calc_bond(index)
        self._use_force(force)

    def run(self):
        try:
            pool = mp.ProcessingPool(nprocs)
            args = itertools.izip(range(self.totalsites), itertools.repeat(force))
            pool.amap(lambda x: self._calc_parallel(*x), args)
        except:
            traceback.print_exc()

And, as with my previous attempts, this too seems to speed quickly through without calling the method.

Update 2

I decided to revamp the code to split my behemoth class into smaller, more manageable components. However, if I use pathos.multiprocessing, I run into a different situation as previous posted (see link). My new code now has an object that can be used for the calculation and then via its methods, should return a value.

import itertools
import pandas as pd
import pathos.multiprocessing as mp

class ForceData(object):
    def __init__(self, *args, **kwargs):
        # Setup data
        self.value = pd.DataFrame()
    def calculateBondData(self, index):
        # Calculation
        return self.value
    def calculateNonBondedData(self, index):
        # Calculation
        return self.value
    def calculateAll(self, index):
        # Because self.value is a pandas.DataFrame, changed internally
        self.calculateBondData(index)
        self.calculateNonBondedData(index)
        return self.value

class ForceMatrix(object):
    def __init__(self, *args, **kwargs):
        # Initialize data
        self._matrix = pd.DataFrame()
    def map(self, data):
        for value in data.get():
            for i, j in itertools.product(value.index, repeat=2):
                self._matrix.loc[[i], [j]] += value.values

def calculate(self, *args, **kwargs):
    # Setup initial information.
    fd = ForceData()
    matrix = ForceMatrix()
    pool = mp.ProcessingPool()
    data = pool.amap(fd.calculateAll, range(x))
    matrix.map(data, force)
    return matrix

I thought that an separate function func(dataobj, force), but this doesn't seem to help either. At the current rate, I estimate a complete calculation on a single processor to take over 1000 hours, which is too long for something that should be quicker.

Update 3 (4/30/15)

Because of @MikeMcKerns helpful insights, I may have settled upon a possible solution. On an iMac (quad-core) or a 16-core node of a cluster, I have found that, for a coarse-grain (CG) system with no bonds, a double itertools.imap seems to be my best solution (1000 CG sites) clocks in at approximately 5.2 s per trajectory frame. When I move onto a system that includes some bond details (3000 CG sites representing water), I found that, on the iMac (using 1 core), itertools.imap followed by pathos.ThreadingPool.uimap (4 threads) clocks in at approximately 85 s/frame; if I attempt the process pool (4 cores x 2)/thread pool (4 threads) as suggested in the comments by @MikeMcKerns, computation time increased by 2.5 times. On the 16-core cluster (32 pp/16 tp), this CG system also goes slowly (approx. 160 s/frame). A CG system with 42,778 sites and numerous bonds on the iMac (1 core/4 threads) may clock in around 58 min./frame. I have yet to test this large system on a 16-core node of a cluster, but I'm unsure whether using the process pool/thread pool with speed it up any further.

Examples:

# For a CG system with no bond details
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = itertools.imap(func2, data1)
    for values in data2:
        func3(values)

# For a system with bond details
import pathos.multiprocessing as mp

tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = itertools.imap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

# Seems to be the slowest in the bunch on iMac and possibly on 16-cores of a node.
ppool = mp.ProcessingPool(mp.cpu_count() * 2)
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
    data1 = ppool.uimap(func1, range(nsites))
    data2 = tpool.uimap(func2, data1)
    for values in data2:
        func3(values)

I suspect that the larger the system, the more benefit I may gain from multiprocessing. I know that the large CG system (42,778 sites) takes approximately 0.08 s/site compared with 0.02 s/site (3000 CG sites) or 0.05 s/site (1000 sites no bonds).

Amidst my striving to shave off computation times, I discovered areas where I could trim down some of the computations, (e.g., global variables and algorithm changes), but if I could reduce this down further by full-scale multirpcoessing, that would be great.

Community
  • 1
  • 1
Tim
  • 107
  • 1
  • 10
  • It would make helping you easier if you could make your example code minimal (e.g. remove most of the method bodies which don't have anything to do with the unexpected behaviour) – tobyodavies Apr 23 '15 at 07:36
  • @tobyodavies, I can do that. I initially had it parred down but thought that someone might want to know about the rest. I can definitely remove some of it. Thanks. – Tim Apr 23 '15 at 08:20
  • It looks like you're expecting the child processes to have a side effect in the parent process. Is that right? That is, the child process modifies self somehow rather than returning its computed data. You can't do this with multiprocessing, and will need to rethink your solution. If this is the case then I show you a minimal working example of how to do this. – Dunes Apr 23 '15 at 12:25
  • @Dunes, so you're saying that instead of using managed variables (mp.Manager.dict, etc.), that I should return values from each method and then do final processing? In my methods, one of my passed arguments is a pandas.DataFrame, but I also have where the same DataFrame is returned if I want to collect it, but one of my methods processed the data and stored it in a managed data structure. However, I guess this is incorrect thinking. I appreciate any insight that you can offer. – Tim Apr 23 '15 at 14:27
  • Hi Tim, I'm the `pathos` author. You seem to be calling `pool.amap`, which should return a result object `result` that you aren't' saving anywhere. You then need to call `result.get()` to get the result. There's also `imap`, which returns an iterator, and plain old `map` which directly returns a list of calculated values. `amap` is asynchronous, so it should not block on the map call -- it blocks at the `get`. If you want to block at the map, use `map`. – Mike McKerns Apr 24 '15 at 07:03
  • @MikeMcKerns, thanks. I realized after more searching that I was going about this the wrong way. I initially did not realize that pathos and Python's multiprocessing created copies of the object. Therefore, they were not sharing the data, which meant that my calculations were off anyway. I decided to rewrite the code, splitting my behemoth class into smaller chunks. This allowed me to create the desired copies using either amap or map_async and to perform my calculations. I was expecting too much in a single method. On a single processor, this works, but not for multiprocessing. – Tim Apr 24 '15 at 07:54
  • @MikeMcKerns, I separated it out, but then I ran across the issue of pathos choking as found in [link](http://stackoverflow.com/questions/27994321/python-multiprocessing-scipy-stats-lognorm-fit). I tried using an external function to call the object and the variable to no avail (e.g., `def func(obj, index)`). Any thoughts? – Tim Apr 24 '15 at 12:46
  • @Tim: did the answer for your link not also solve your problem, if it is failing due to a pickling? It should succeed if you make a thin wrapper around an import. Also, if you need to share data, then you might want to look into `multiprocessing.ctypes` -- with it you can create shared memory objects/arrays. It's a bit tricky, and you have to worry about thread-locking, but it can work if you need to avoid the copy that `multiprocessing` makes in the default case. – Mike McKerns Apr 24 '15 at 17:28
  • How long does `fd.CalculateAll` take for a single call? Aha… I see an issue, you don't want to use `Pool().amap` and `data.get()`… that's blocking at `get`. The better choice is `Pool().imap`, which produces an iterator… and you can then do the for loop in an iterated fashion as results come in. Or better, rewrite and do a thread pool and a processing pool, where you `get` the results only at the end of the nested map functions. – Mike McKerns Apr 24 '15 at 20:30
  • For example, see here: http://stackoverflow.com/a/28770041/2379433 – Mike McKerns Apr 24 '15 at 20:32
  • ...and http://stackoverflow.com/a/28382913/2379433 – Mike McKerns Apr 24 '15 at 20:38
  • @MikeMcKerns, thanks for the responses. Could you post an example of how I might achieve the usage of the thread pool/processing pool with the nested map function? FYI, a single calculation takes, on average, 300 ms, but with a large data set (40000 - 60000) repeated 1000x, this adds up (processing a simulation trajectory of N atoms). I'm honestly grateful that I split the class into smaller components, but I'm realizing that multiprocessing can be a headache--although worthwhile in the end. – Tim Apr 25 '15 at 00:42
  • @MikeMcKerns, I have attempted using the imap feature, but I still get raised exceptions boiling down to the NotImplemented because of my class even with a wrapper function or a partial. I'm going to look into my class and maybe scale it back as a test. I use pandas and MDAnalysis within my class, which makes me wonder whether these can be handled, but they are initially setup in the constructor. I so definitely want this to work, but I'm still bamboozled. Thanks again for your comments. – Tim Apr 25 '15 at 08:08
  • An easy way to check if something can be serialized is to try something like `dill.detect.errors` or `dill.pickles` or `dill.copy`. – Mike McKerns Apr 25 '15 at 17:46
  • Here are some nested map examples: http://stackoverflow.com/a/29818165/2379433, and http://stackoverflow.com/questions/28203774/how-to-do-hierarchical-parallelism-in-ipython-parallel – Mike McKerns Apr 25 '15 at 17:51
  • @MikeMcKerns, what would you recommend: using a threading pool for the heavy calculation and a processing pool for the collection as in your last comment, or some other method? Currently, I've set up a processing pool (uimap) for the heavy-lifting and a threading pool (amap) for the final data collection. I'm wondering if I could use uimap for both. Thanks again for your help. – Tim Apr 29 '15 at 07:24
  • The choice of "iterative" map versus "async" map should be made based on whether or not you need blocking. `amap` blocks on the `get` for all results, while `uimap` blocks on the `next` only for the next result. With regard to threads or processing, I've found it works better to have one thread pool and one processing pool. I'm working on adding flexibility that could change that, but for now, that's the case. You basically have to think about how expensive the function evaluation is… and how many processes to spawn at once on your system -- you can use many more threads for light work. – Mike McKerns Apr 29 '15 at 15:10
  • @MikeMcKerns, thanks for the information. The interesting thing to me is that a small system (1000 sites) runs faster with itertools.imap. A system with 3000 sites runs fastest on my iMac (4 cores) with itertools.imap and ThreadingPool.uimap (4 threads). If I use the combo suggested above, they both slow down, especially with a 16-core machine. I'm not sure how my large (>40,000 sites) will fair, however. Any thoughts on this? – Tim Apr 30 '15 at 14:56
  • The reason is that itertools.imap **is** cheaper… it's leveraging C parallelism, while the multiprocessing version is spawning processes. As soon as you start spawning processes, you incur a lot of overhead. For threads, you can eventually hit overhead in memory. – Mike McKerns Apr 30 '15 at 19:27
  • Your comment then begs the question: when is a good time to use parallelization? I realize that these small systems (probably less than 10,000 sites) may not need parallelization, but I'm wondering whether my larger systems will benefit from it assuming that the calculations per site is under 1 second. – Tim May 01 '15 at 01:00
  • It's always a tradeoff/optimization between the overhead and the savings of the parallelism. The reason I built `pathos` in the first place, was that upon realizing that you have to be able to "experiment" a bit to discover the right level and means of parallelism… I found that it needed to be very easy to experiment. – Mike McKerns May 19 '15 at 12:33

1 Answers1

2

Your options are fairly limited if you are using python 2.7

Here's a short example of calling an object's method with arguments with a pool.

The first problem is that only function defined at the top level of a module can be pickled. Unix-based systems have a way of getting around this limitation, but this shouldn't be relied upon. So you must define a function that takes the object you want and arguments needed to call the relevant method.

For instance:

def do_square(args):
    squarer, x = args # unpack args
    return squarer.square(x)

The Squarer class might look like this:

class Squarer(object):
    def square(self, x):
        return x * x

Now to apply the square function in parallel.

if __name__ == "__main__":
    # all pool work must be done inside the if statement otherwise a recursive 
    # cycle of Pool spawning will be created.

    pool = Pool()
    sq = Squarer()
    # create args as a sequence of tuples
    args = [(sq, i) for i in range(10)]
    print pool.map(do_square, args)

    pool.close()
    pool.join()

Note that the squarer is not stateful. It receives data, processes it and returns the result. This is because the child's state is separate to the parent's. Changes in one will not be reflected in the other unless Queues, Pipes or other shared state classes made available by multiprocessing are used. In general it is better to return the computed data from the child process and then do something with that data in the parent process rather than try to store the data in some shared variable that the child process has access to .

Example of stateful squarer not working with multiprocessing:

class StatefulSquarer(object):

    def __init__(self):
        self.results = []

    def square(self, x):
        self.results.append(x * x)

if __name__ == "__main__":

    print("without pool")
    sq = StatefulSquarer()
    map(do_square, [(sq, i) for i in range(10)])
    print(sq.results)

    print("with pool")
    pool = Pool()
    sq = StatefulSquarer()
    pool.map(do_square, [(sq, i) for i in range(10)])
    print(sq.results)

    pool.close()
    pool.join()

If you wanted to make this work then a better solution would be something like:

for result in pool.map(do_square, [(sq, i) for i in range(10)]):
    sq.results.append(result)

What if your class is very big, but immutable. Every time you start a new task in map this huge object has to be copied over to the child process. Can, however, you save time by only copying it over to the child process once.

from multiprocessing import Pool

def child_init(sq_):
    global sq
    sq = sq_

def do_square(x):
    return sq.square(x)

class Squarer(object):
    def square(self, x):
        return x * x

if __name__ == "__main__":
    sq = Squarer()
    pool = Pool(initializer=child_init, initargs=(sq,))

    print(pool.map(do_square, range(10)))

    pool.close()
    pool.join()
Dunes
  • 37,291
  • 7
  • 81
  • 97
  • How would this work by calling Pool within a class as in my example code? I call Pool from within one method to invoke another method. I read that pathos may offer a solution, but, as noted in my update, this didn't seem to work. Would using multiprocessing.Process with a JoinableQueue prove any better? If so, how do I avoid the defunct process issue? – Tim Apr 23 '15 at 14:29
  • If you have kept up with my discussion with Mike and read my latest update, you'l notice that I may have come across a solution. However, I want to let you know that your solution was helpful. This game me some ideas for another project that I'm working on, and although your answer may not have been the solution for my current problem, it may be for another project. Thank you again for your answer; I appreciate it. – Tim Apr 30 '15 at 15:32