0

I am performing a simulation in which I want to save the snapshots of the state vector, and calculate do it for different parameters. I have two control parameters that I which to scan (p and a in the example bellow). I therefore save the simulation results for one netCDF4 file, in which two of the dimensions are for the two control parameters. When I run the simulation for one parameters set the file is saved correctly, but when I try to run apply_async from multiprocessing the netCDF4 at the end of the process is unaccessible.

My full code is at this github repository, but basically what I'm trying to do is this:

import multiprocessing as mp
import time as timer
import netCDF4
import numpy as np
def run_sim_for_p_a(p,a,pstep,astep,step,max_time,u0,fname):
    time_ar=np.arange(0,max_time,step)
    u = np.ones((len(time_ar),1024))
    u[0]=u0
    print "Calculating for p,a:",p,a
    for i,t in enumerate(time_ar[1:]):
        u[i+1] = u[i]*np.cos(t)*np.sin(a)*np.sin(p)
    for tstep,t in enumerate(time_ar):
        save_p_a_snapshot(fname,pstep,astep,tstep,p,a,t,u[tstep]) # save the results into the netCDF4 file

def apply_async_and_save_grid(pmin,pmax,fname,
                              Np=10,Na=10,
                              step=None,max_time=500.0,numproc=10):
    start = timer.time()
    setup_p_a_scan(fname) # Setup a netCDF4 file for the simulations
    if step is None:
        step=max_time
    p_range = np.linspace(pmin,pmax,Np)
    init = np.random.random((1024))
    a_range = np.linspace(0,1,Na)
    availble_cpus = int(available_cpu_count() - 2)
    numproc=min(numproc,availble_cpus)
    print "Using",numproc," processors"
    pool = mp.Pool(processes=numproc)
    for i,p in enumerate(p_range):
        for j,a in enumerate(a_range):
            pool.apply_async(run_sim_for_p_a,
                             args = (p,a,i,j,step,max_time,init,fname))
    pool.close()
    pool.join()
    print "Took ",timer.time()-start
if __name__=="__main__":
    apply_async_and_save_grid(1.0,2.0,"test",Np=2,Na=4,step=1.0,max_time=10)
Ohm
  • 2,312
  • 4
  • 36
  • 75
  • 1
    You have multiple processes running asychronously, Not *communicating*, accessing the same disk file.?! Might want to watch this talk, [Keynote on Concurrency](https://www.youtube.com/watch?v=9zinZmE3Ogk) – wwii Dec 31 '17 at 16:30
  • Yes, the processes do not communicate between them, but save the results to the same disk file. – Ohm Jan 01 '18 at 09:02

1 Answers1

1

There are at least two possible approaches:

You could have each worker process write its results to its own netCDF4 file, and have the main program merge them after all workers are finished.

I'm not familiar with the netCDF format, but assuming it is possible to append to such files, another possibility is to create a multiprocessing.Lock before starting apply_async. This lock should be added to the parameters for the worker process. The worker process should acquire the lock, open the netcdf file, write to it and close it. Then it should release the lock. This will ensure that only one process at a time will be writing to the netCDF file.

Edit: See the answer to this question on how to handle a Lock with a Pool.

Roland Smith
  • 42,427
  • 3
  • 64
  • 94
  • Excellent, I had to add either a manager or a global lock, since I use a pool of workers, so I've referred to this thread - https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes – Ohm Jan 01 '18 at 08:09
  • @Ohm Excellent find! There are some subtleties w.r.t. a `Pool` and `Lock` that I hadn't realized. I've updated my answer to match. – Roland Smith Jan 01 '18 at 08:17