2

I am coding in python and using mpi4py to do some optimization in parallel. I am using Ordinary Least Squares, and my data is too large to fit on one processor, so I have a master process that then spawns other processes. These child processes each import a section of the data that they respectively work with throughout the optimization process.

I am using scipy.optimize.minimize for the optimization, so the child processes receive a coefficient guess from the parent process, and then report the sum of squared error (SSE) to the parent process, and then scipy.optimize.minimize goes through iterations, trying to find a minimum for the SSE. After each iteration of the minimize function, the parent broadcasts the new coefficient guesses to the child processes, who then calculate the SSE again. In the child processes, this algorithm is set up in a while loop. In the parent process, I simply call scipy.optimize.minimize.

On the part that is giving me a problem, I am doing a nested optimization, or an optimization within an optimization. The inner optimization is an OLS regression as described above, and then the outer optimization is minimizing another function that uses the coefficient of the inner optimization (the OLS regression).

So in my parent process, I have two functions that I minimize, and the second function calls on the first and does a new optimization for every iteration of the second function's optimization. The child processes have a nested while loop for those two optimizations.

Hopefully that all makes sense. If more information is needed, please let me know.

Here is the relevant code for the parent process:

comm = MPI.COMM_SELF.Spawn(sys.executable,args = ['IVQTparallelSlave_cdf.py'],maxprocs=processes)

# First stage: reg D on Z, X
def OLS(betaguess):
    comm.Bcast([betaguess,MPI.DOUBLE], root=MPI.ROOT)
    SSE = np.array([0],dtype='d')
    comm.Reduce(None,[SSE,MPI.DOUBLE], op=MPI.SUM, root = MPI.ROOT)
    comm.Bcast([np.array([1],'i'),MPI.INT], root=MPI.ROOT)
    return SSE


# Here is the CDF function.
def CDF(yguess, delta_FS, tau):
    # Calculate W(y) in the slave process
    # Solving the Reduced form after every iteration: reg W(y) on Z, X
    comm.Bcast([yguess,MPI.DOUBLE], root=MPI.ROOT)
    betaguess = np.zeros(94).astype('d')
    ###########
    # This calculates the reduced form coefficient
    coeffs_RF = scipy.minimize(OLS,betaguess,method='Powell')
    # This little block is to get the slave processes to stop
    comm.Bcast([betaguess,MPI.DOUBLE], root=MPI.ROOT)
    SSE = np.array([0],dtype='d')
    comm.Reduce(None,[SSE,MPI.DOUBLE], op=MPI.SUM, root = MPI.ROOT)
    cont = np.array([0],'i')
    comm.Bcast([cont,MPI.INT], root=MPI.ROOT)
    ###########
    contCDF = np.array([1],'i')
    comm.Bcast([contCDF,MPI.INT], root=MPI.ROOT) # This is to keep the outer while loop going

    delta_RF = coeffs_RF.x[1]

    return abs(delta_RF/delta_FS - tau)

########### This one finds Y(1) ##############

betaguess = np.zeros(94).astype('d')

######### First Stage: reg D on Z, X ######### 
coeffs_FS = scipy.minimize(OLS,betaguess,method='Powell')

print coeffs_FS

# This little block is to get the slave processes' while loops to stop
comm.Bcast([betaguess,MPI.DOUBLE], root=MPI.ROOT)
SSE = np.array([0],dtype='d')
comm.Reduce(None,[SSE,MPI.DOUBLE], op=MPI.SUM, root = MPI.ROOT)
cont = np.array([0],'i')
comm.Bcast([cont,MPI.INT], root=MPI.ROOT)

delta_FS = coeffs_FS.x[1]

######### CDF Function ######### 
yguess = np.array([3340],'d')
CDF1 = lambda yguess: CDF(yguess, delta_FS, tau)
y_minned_1 = scipy.minimize(CDF1,yguess,method='Powell')

Here is the relevant code for the child processes:

#IVQTparallelSlave_cdf.py
comm = MPI.Comm.Get_parent()

.
.
.
# Importing data. The data is the matrices D, and ZX
.
.
.
########### This one finds Y(1) ##############
######### First Stage: reg D on Z, X ######### 
cont = np.array([1],'i')
betaguess = np.zeros(94).astype('d')

# This corresponds to 'coeffs_FS = scipy.minimize(OLS,betaguess,method='Powell')' of the parent process
while cont[0]:
    comm.Bcast([betaguess,MPI.DOUBLE], root=0)

    SSE = np.array(((D - np.dot(ZX,betaguess).reshape(local_n,1))**2).sum(),'d')

    comm.Reduce([SSE,MPI.DOUBLE],None, op=MPI.SUM, root = 0)
    comm.Bcast([cont,MPI.INT], root=0)

if rank==0: print '1st Stage OLS regression done'

######### CDF Function ######### 
cont = np.array([1],'i')
betaguess = np.zeros(94).astype('d')
contCDF = np.array([1],'i')
yguess = np.array([0],'d')

# This corresponds to 'y_minned_1 = spicy.minimize(CDF1,yguess,method='Powell')'
while contCDF[0]:
    comm.Bcast([yguess,MPI.DOUBLE], root=0)
    # This calculates the reduced form coefficient
    while cont[0]: 
        comm.Bcast([betaguess,MPI.DOUBLE], root=0)

        W = 1*(Y<=yguess)*D
        SSE = np.array(((W - np.dot(ZX,betaguess).reshape(local_n,1))**2).sum(),'d')    

        comm.Reduce([SSE,MPI.DOUBLE],None, op=MPI.SUM, root = 0)
        comm.Bcast([cont,MPI.INT], root=0)
        #if rank==0: print cont
    comm.Bcast([contCDF,MPI.INT], root=0)

My problem is that after one iteration through the outer minimization, it spits out the following error:

Internal Error: invalid error code 409e0e (Ring ids do not match) in MPIR_Bcast_impl:1328
Traceback (most recent call last):
  File "IVQTparallelSlave_cdf.py", line 100, in <module>
    if rank==0: print 'CDF iteration'
  File "Comm.pyx", line 406, in mpi4py.MPI.Comm.Bcast (src/mpi4py.MPI.c:62117)
mpi4py.MPI.Exception: Other MPI error, error stack:
PMPI_Bcast(1478).....: MPI_Bcast(buf=0x2409f50, count=1, MPI_INT, root=0, comm=0x84000005) failed
MPIR_Bcast_impl(1328): 

I haven't been able to find any information about this "ring id" error or how to fix it. Help would be much appreciated. Thanks!

  • 2
    My crystal ball tells me that some (or most) of your MPI ranks are making collective calls like `MPI_Bcast` (or its mpi4py equivalent) in different order or not all of them participate in the collective calls. But my crystal ball could be wrong, so show us some code. – Hristo Iliev Sep 03 '14 at 09:02
  • 1
    Thanks @HristoIliev , I just added some of the code. So by what your crystal ball is saying, I should use comm.Barrier() to make sure all of the processes don't get out of sync? – Alex Poulsen Sep 03 '14 at 18:10
  • The code you've pasted here is missing something that would terminate the outer `while` loop in the workers. – Hristo Iliev Sep 04 '14 at 15:10
  • Anyway, on prima vista it looks like your collectives are matched. What you can possibly do is log for each rank what call it makes. E.g. put `B` for broadcast and `R` for reduction. Then compare the log files - the sequences should match. – Hristo Iliev Sep 04 '14 at 15:33
  • 1
    I have code that terminates the outer while loop, I just didn't include it, sorry. So I have been experimenting with the Barrier function over the last 2 days, but that hasn't worked yet. How would you recommend making a log file that just prints out B for broadcast and R for reduction? I tried a function that prints out function calls (found here http://stackoverflow.com/questions/8315389/how-do-i-print-functions-as-they-are-called), but the output was unwieldily. Thanks, @HristoIliev – Alex Poulsen Sep 05 '14 at 21:44

0 Answers0