I recently wrote a program with a class for my research, and I've attempted to parallelize it. When I've used Python 2.7's multiprocessing.Process with a JoinableQueue and managed data, my program eventually hangs with defunct processes.
import multiprocessing as mp
import traceback
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _calc_parallel(self, index):
self._calc_bond(index)
def run(self):
for ts, force in itertools.izip(self.coortrj, self.forcevec):
try:
consumers = [mp.Process(target=self._calc_parallel,
args=(force,)) for i in range(nprocs)]
for w in consumers:
w.start()
# Enqueue jobs
for i in range(self.totalsites):
self.tasks.put(i)
# Add a poison pill for each consumer
for i in range(nprocs):
self.tasks.put(None)
self.tasks.close()
self.tasks.join()
# for w in consumers:
# w.join()
except:
traceback.print_exc()
_calc_parallel calls some other class methods.
I have even tried to use multiprocessing.Pool for this purpose using the copy_reg option as found elsewhere on http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods.
import multiprocessing as mp
import traceback
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _use_force(force):
# Calculate data
def _calc_parallel(self, index, force):
self._calc_bond(index)
self._use_force(force)
def run(self):
try:
pool = mp.Pool(processes=nprocs, maxtasksperchild=2)
args = itertools.izip(range(self.totalsites), itertools.repeat(force))
pool.map_async(self._calc_parallel, args)
pool.close()
pool.join()
except:
traceback.print_exc()
However, the pool.map_async does not seem to call self._calc_parallel. I know in both cases (Process and Pool), I'm overlooking something, but I'm not exactly clear as to what. I am processing typically over 40,000 elements.
Thanks for the help.
Update
After reading over several other posts, I also tried pathos.multiprocessing.
import pathos.multiprocessing as mp
class Paramfit(object):
def __init__(self):
pass
def _calc_bond(self, index):
# Calculate data
def _use_force(force):
# Calculate data
def _calc_parallel(self, index, force):
self._calc_bond(index)
self._use_force(force)
def run(self):
try:
pool = mp.ProcessingPool(nprocs)
args = itertools.izip(range(self.totalsites), itertools.repeat(force))
pool.amap(lambda x: self._calc_parallel(*x), args)
except:
traceback.print_exc()
And, as with my previous attempts, this too seems to speed quickly through without calling the method.
Update 2
I decided to revamp the code to split my behemoth class into smaller, more manageable components. However, if I use pathos.multiprocessing, I run into a different situation as previous posted (see link). My new code now has an object that can be used for the calculation and then via its methods, should return a value.
import itertools
import pandas as pd
import pathos.multiprocessing as mp
class ForceData(object):
def __init__(self, *args, **kwargs):
# Setup data
self.value = pd.DataFrame()
def calculateBondData(self, index):
# Calculation
return self.value
def calculateNonBondedData(self, index):
# Calculation
return self.value
def calculateAll(self, index):
# Because self.value is a pandas.DataFrame, changed internally
self.calculateBondData(index)
self.calculateNonBondedData(index)
return self.value
class ForceMatrix(object):
def __init__(self, *args, **kwargs):
# Initialize data
self._matrix = pd.DataFrame()
def map(self, data):
for value in data.get():
for i, j in itertools.product(value.index, repeat=2):
self._matrix.loc[[i], [j]] += value.values
def calculate(self, *args, **kwargs):
# Setup initial information.
fd = ForceData()
matrix = ForceMatrix()
pool = mp.ProcessingPool()
data = pool.amap(fd.calculateAll, range(x))
matrix.map(data, force)
return matrix
I thought that an separate function func(dataobj, force)
, but this doesn't seem to help either. At the current rate, I estimate a complete calculation on a single processor to take over 1000 hours, which is too long for something that should be quicker.
Update 3 (4/30/15)
Because of @MikeMcKerns helpful insights, I may have settled upon a possible solution. On an iMac (quad-core) or a 16-core node of a cluster, I have found that, for a coarse-grain (CG) system with no bonds, a double itertools.imap
seems to be my best solution (1000 CG sites) clocks in at approximately 5.2 s per trajectory frame. When I move onto a system that includes some bond details (3000 CG sites representing water), I found that, on the iMac (using 1 core), itertools.imap
followed by pathos.ThreadingPool.uimap
(4 threads) clocks in at approximately 85 s/frame; if I attempt the process pool (4 cores x 2)/thread pool (4 threads) as suggested in the comments by @MikeMcKerns, computation time increased by 2.5 times. On the 16-core cluster (32 pp/16 tp), this CG system also goes slowly (approx. 160 s/frame). A CG system with 42,778 sites and numerous bonds on the iMac (1 core/4 threads) may clock in around 58 min./frame. I have yet to test this large system on a 16-core node of a cluster, but I'm unsure whether using the process pool/thread pool with speed it up any further.
Examples:
# For a CG system with no bond details
for i in range(nframes):
data1 = itertools.imap(func1, range(nsites))
data2 = itertools.imap(func2, data1)
for values in data2:
func3(values)
# For a system with bond details
import pathos.multiprocessing as mp
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
data1 = itertools.imap(func1, range(nsites))
data2 = tpool.uimap(func2, data1)
for values in data2:
func3(values)
# Seems to be the slowest in the bunch on iMac and possibly on 16-cores of a node.
ppool = mp.ProcessingPool(mp.cpu_count() * 2)
tpool = mp.ThreadingPool(mp.cpu_count())
for i in range(nframes):
data1 = ppool.uimap(func1, range(nsites))
data2 = tpool.uimap(func2, data1)
for values in data2:
func3(values)
I suspect that the larger the system, the more benefit I may gain from multiprocessing. I know that the large CG system (42,778 sites) takes approximately 0.08 s/site compared with 0.02 s/site (3000 CG sites) or 0.05 s/site (1000 sites no bonds).
Amidst my striving to shave off computation times, I discovered areas where I could trim down some of the computations, (e.g., global
variables and algorithm changes), but if I could reduce this down further by full-scale multirpcoessing, that would be great.