2

I have two classes. One called algorithm and the other called Chain. In algorithm, I create multiple chains, which are going to be a sequence of sampled values. I want to run the sampling in parallel at the chain level.

In other words, the algorithm class instantiates n chains and I want to run the _sample method, which belongs to the Chain class, for each of the chains in parallel within the algorithm class.

Below is a sample code that attempts what I would like to do.

I have seen a similar questions here: Apply a method to a list of objects in parallel using multi-processing, but as shown in the function _sample_chains_parallel_worker, this method does not work for my case (I am guessing it is because of the nested class structure).

Question 1: Why does this not work for this case?

The method in _sample_chains_parallel also does not even run in parallel.

Question 2: Why?

Question 3: How do I sample each of these chains in parallel?

import time
import multiprocessing

class Chain():

    def __init__(self):
        self.thetas = []

    def _sample(self):
        for i in range(3):
            time.sleep(1)
            self.thetas.append(i)

    def clear_thetas(self):
        self.thetas = []

class algorithm():

    def __init__(self, n=3):
        self.n = n
        self.chains = []

    def _init_chains(self):
        for _ in range(self.n):
            self.chains.append(Chain())

    def _sample_chains(self):
        for chain in self.chains:
            chain.clear_thetas()
            chain._sample()

    def _sample_chains_parallel(self):
        pool = multiprocessing.Pool(processes=self.n)
        for chain in self.chains:
            chain.clear_thetas()
            pool.apply_async(chain._sample())
        pool.close()
        pool.join()

    def _sample_chains_parallel_worker(self):

        def worker(obj):
            obj._sample()

        pool = multiprocessing.Pool(processes=self.n)
        pool.map(worker, self.chains)

        pool.close()
        pool.join()


if __name__=="__main__":
    import time

    alg = algorithm()
    alg._init_chains()

    start = time.time()
    alg._sample_chains()
    end = time.time()
    print "sequential", end - start

    start = time.time()
    alg._sample_chains_parallel()
    end = time.time()
    print "parallel", end - start

    start = time.time()
    alg._sample_chains_parallel_worker()
    end = time.time()
    print "parallel, map and worker", end - start
leonard
  • 795
  • 1
  • 9
  • 18
  • @Prune I edited the question to make my questions clearer. How dissimilar is this from the question i referenced (link on in my question)? – leonard Sep 14 '18 at 18:49
  • 1
    I see your issues *much* better now; thanks! I'm removing my original comment, as I'm worried it could scare off helpers who would otherwise diagnose this much faster than I will. – Prune Sep 14 '18 at 18:57

1 Answers1

1

In _sample_chains_parallel you are calling chain._sample() instead of just passing the function: pool.apply_async(chain._sample()). So you are passing the result as an argument instead of letting apply_async calculate it.

But removing () won't help you much, because Python 2 cannot pickle instance methods (possible for Python +3.5). It wouldn't raise the error unless you call get() on the result objects so don't rejoice if you see low times for this approach, that's because it immidiately quits with an unraised exception.

For the parallel versions you would have to relocate worker to the module level and call it pool.apply_async(worker (chain,)) respectively pool.map(worker, self.chains).

Note that you forgot clear_thetas() for _sample_chains_parallel_worker. The better solution would be anyway to let let Chain._sample take care of calling self._clear_thetas().

Darkonaut
  • 20,186
  • 7
  • 54
  • 65