1

I tried (unsuccessfully) to parallelize a loop using multiprocessing. Here is my Python code:

from MMTK import *
from MMTK.Trajectory import Trajectory, TrajectoryOutput, SnapshotGenerator
from MMTK.Proteins import Protein, PeptideChain
import numpy as np

filename = 'traj_prot_nojump.nc'

trajectory = Trajectory(None, filename)
universe = trajectory.universe
proteins = universe.objectList(Protein)
chain = proteins[0][0]

def calpha_2dmap_mult(t = range(0,len(trajectory))):
    dist = []
    global trajectory
    universe = trajectory.universe
    proteins = universe.objectList(Protein)
    chain = proteins[0][0]
    traj = trajectory[t]
    dt = 1000 # calculate distance every 1000 steps
    for n, step in enumerate(traj):
        if n % dt == 0:
            universe.setConfiguration(step['configuration'])
            for i in np.arange(len(chain)-1):
                for j in np.arange(len(chain)-1):
                    dist.append(universe.distance(chain[i].peptide.C_alpha,
                                                  chain[j].peptide.C_alpha))
    return(dist)

dist1 = calpha_2dmap_mult(range(1000,2000))
dist2 = calpha_2dmap_mult(range(2000,3000))

# Multiprocessing
from multiprocessing import Pool, cpu_count

pool = Pool(processes=2)
dist_pool = [pool.apply(calpha_2dmap_mult, args=(t,)) for t in [range(1000,2000), range(2000,3000)]]

print(dist_pool[0]==dist1)
print(dist_pool[1]==dist2)

If I try Pool(processes = 1), the code works as expected but as soon as I ask for more than one process, the code crashes with this error:

python: posixio.c:286: px_pgin: Assertion `*posp == ((off_t)(-1)) || *posp == lseek(nciop->fd, 0, 1)' failed.

If someone has a suggestion, it will be very much appreciated ;-)

daniel.heydebreck
  • 768
  • 14
  • 22
guillaume
  • 61
  • 4

4 Answers4

0

I suspect it's because of this:

trajectory = Trajectory(None, filename)

You open the file just once, at the start. You should probably just pass the filename into the multiprocessing target function, and open it in there.

John Zwinck
  • 239,568
  • 38
  • 324
  • 436
0

If you are running this code on a OS X or any other Unix-like system, multiprocessing uses forking to create subprocesses.

When forking, file descriptors are shared with the parent process. The trajectory object contains a reference to a file descriptor, as far as I can tell.

To fix this, you should place

trajectory = Trajectory(None, filename)

within calpha_2dmap_mult, to ensure that each subprocess opens the file separately.

  • Thanks to your comments (@John and @Wynand), I can know use more than one process ... but the performance is not improved at all! The new script is written in the next answer! – guillaume Oct 13 '14 at 12:27
0

Here is the new script that permits to use more than one process (but no performance improvement):

from MMTK import *
from MMTK.Trajectory import Trajectory, TrajectoryOutput, SnapshotGenerator
from MMTK.Proteins import Protein, PeptideChain
import numpy as np
import time

filename = 'traj_prot_nojump.nc'


trajectory = Trajectory(None, filename)
universe = trajectory.universe
proteins = universe.objectList(Protein)
chain = proteins[0][0]

def calpha_2dmap_mult(trajectory = trajectory, t = range(0,len(trajectory))):
    dist = []
    universe = trajectory.universe
    proteins = universe.objectList(Protein)
    chain = proteins[0][0]
    traj = trajectory[t]
    dt = 1000 # calculate distance every 1000 steps
    for n, step in enumerate(traj):
        if n % dt == 0:
            universe.setConfiguration(step['configuration'])
            for i in np.arange(len(chain)-1):
                for j in np.arange(len(chain)-1):
                    dist.append(universe.distance(chain[i].peptide.C_alpha,
                                                  chain[j].peptide.C_alpha))
    return(dist)

c0 = time.time()
dist1 = calpha_2dmap_mult(trajectory, range(0,11001))
#dist1 = calpha_2dmap_mult(trajectory, range(0,11001))
c1 = time.time() - c0
print(c1) 


# Multiprocessing
from multiprocessing import Pool, cpu_count

pool = Pool(processes=4)
c0 = time.time()
dist_pool = [pool.apply(calpha_2dmap_mult, args=(trajectory, t,)) for t in
             [range(0,2001), range(3000,5001), range(6000,8001),
              range(9000,11001)]]
c1 = time.time() - c0
print(c1)


dist1 = np.array(dist1)
dist_pool = np.array(dist_pool)
dist_pool = dist_pool.flatten()
print(np.all((dist_pool == dist1)))

The time spent to calculate the distances is the 'same' without (70.1s) or with multiprocessing (70.2s)! I was maybe not expecting an improvement of a factor 4 but I was at least expecting some improvements!

guillaume
  • 61
  • 4
0

Sounds like it could be a problem with reading the netCDF file over NFS. Is traj_prot_nojump.nc on NFS storage? See this Unidata mailing list post and this post to the IDL newsgroup. A workaround was suggested in the latter to copy the file to local storage first.

sappjw
  • 373
  • 1
  • 14
  • The trick was to use pool.apply_async instead of pool.apply to get the expected performance. See [http://stackoverflow.com/questions/26356757/python-multiprocessing-no-performance-gain-with-multiple-processes] for the explanation. – guillaume Apr 13 '15 at 12:29