1

I was using multiprocessing perfectly in the messy configuration of my code. I decided to give some order to my code and re-write it as a class then I can easily change the inputs, my new code is as following:

class LikelihoodTest:
      def __init__(self,Xgal,Ygal):
          self.x=Xgal
          self.y=Ygal
          self.objPosition=gal_pos
          self.beta_s=beta
          self.RhoCrit_SigmaC=rho_c_over_sigma_c
          self.AngularDiameter=DA
          self.RhoCrit=rho_crit
          self.Reducedshear=observed_g
          self.ShearError=g_err
      #The 2D function
      def like2d(self,posx, posy):
          stuff=[self.objPosition, self.beta_s, self.RhoCrit_SigmaC , self.AngularDiameter, self.RhoCrit]
          m=4.447e14
          c=7.16
          param=[posx, posy, m, c]
          return reduced_shear( param, stuff, self.Reducedshear, self.ShearError)
      def ShearLikelihood(self):
          n=len(self.x)
          m=len(self.y)
          shared_array_base = multiprocessing.Array(ctypes.c_double, n*m)
          shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
          shared_array = shared_array.reshape( n,m)
          #Restructure the function before you create instance of Pool.
          # Parallel processing
          def my_func(self,i, def_param=shared_array):
              shared_array[i,:] = np.array([float(self.like2d(self.x[j],self.y[i])) for j in range(len(self.x))])
          while True:
                try:
                   print "processing to estimate likelihood in 2D grids......!!!"
                   start = time.time()
                   pool = multiprocessing.Pool(processes=10)
                   pool.map(my_func, range(len(self.y)))
                   print shared_array
                   end = time.time()
                   print "process time:\n",end - start
                   pool.close()
                except ValueError:
                   print "Oops! value error!"
          return shared_array
      def plotLikelihood(self,shared_array):
          #plotting on a mesh the likelihood function in order to see whether you have defined the inputs correctly and you can observe the maximum likelihood in 2D
          # Set up a regular grid of interpolation points
          xi, yi = np.linspace(self.x.min(), self.x.max(), 100), np.linspace(self.y.min(), self.y.max(), 100)
          # Interpolate
          rbf = scipy.interpolate.interp2d(self.x, self.y,shared_array , kind='linear')
          zi = rbf(xi, yi)
          fig, ax = plt.subplots()
          divider = make_axes_locatable(ax)
          im = ax.imshow(zi, vmin=shared_array.min(), vmax=shared_array.max(), origin='lower',
                        extent=[self.x.min(), self.x.max(), self.y.min(),self.y.max()])
          ax.set_xlabel(r"$Xpos$")
          ax.set_ylabel(r"$Ypos$")
          ax.xaxis.set_label_position('top')
          ax.xaxis.set_tick_params(labeltop='on')
          cax = divider.append_axes("right", size="5%", pad=0.05)
          cbar = fig.colorbar(im,cax=cax, ticks=list(np.linspace(shared_array.max(), shared_array.min(),20)),format='$%.2f$')
          cbar.ax.tick_params(labelsize=8) 
          plt.savefig('/users/Desktop/MassRecons/Likelihood2d_XY_Without_Shear_Uncertainty.pdf', transparent=True, bbox_inches='tight', pad_inches=0)
          plt.close()

I got the following error when I tried to run it with the class configuration:

if __name__ == '__main__':
     Xgal = np.linspace(Xgalaxy.min(), Xgalaxy.max(), 1000)
     Ygal = np.linspace(Ygalaxy.min(), Ygalaxy.max(), 1000)          
     Test=LikelihoodTest(Xgal,Ygal) 
     Test.ShearLikelihood()
processing to estimate likelihood in 2D grids......!!!
ERROR: PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed [multiprocessing.pool]
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 34, in ShearLikelihood
  File "/vol/1/anaconda/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/vol/1/anaconda/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Is there anyway to fix it?

Dalek
  • 4,168
  • 11
  • 48
  • 100
  • 1
    Yes. Define `my_func` at the module level or as an instance method. Python is very limiting as to what it can and can't pickle and it won't pickle nested functions. – Joel Cornett Aug 19 '14 at 12:40
  • @JoelCornett where could I define `shared_array` because it is an input for `my_func` and it is a local variable? – Dalek Aug 19 '14 at 13:16
  • 1
    Well, for one, you could pass it in as part of a `tuple` of arguments (i.e. `zip()` it to `range()`). Incidentally, I don't why you need to wrap everything in a class. Also, I'm not sure you meant to wrap your `pool.map` call inside of a `while` loop? – Joel Cornett Aug 19 '14 at 13:53
  • @JoelCornett since it is two loops I don't know how it should be done except this way, while is not necessary I can just remove it. But I need the structure of a class! – Dalek Aug 19 '14 at 14:46

2 Answers2

2

I could finally figure it out how I can use multiprocessing working in my class. I used pathos.multiprocessing and changed the code as following:

import numpy as np
import pathos.multiprocessing as multiprocessing 

class LikelihoodTest:
      def __init__(self,Xgal,Ygal):
          self.x=Xgal
          self.y=Ygal
          self.objPosition=gal_pos
          self.beta_s=beta
          self.RhoCrit_SigmaC=rho_c_over_sigma_c
          self.AngularDiameter=DA
          self.RhoCrit=rho_crit
          self.Reducedshear=observed_g
          self.ShearError=g_err
          #The 2D function
      def like2d(self,posx, posy):
          stuff=[self.objPosition, self.beta_s, self.RhoCrit_SigmaC , self.AngularDiameter, self.RhoCrit]
          m=4.447e14
          c=7.16
          param=[posx, posy, m, c]
          return reduced_shear( param, stuff, self.Reducedshear, self.ShearError)
      def ShearLikelihood(self,r):
          return [float(self.like2d(self.x[j],r)) for j in range(len(self.x))]
      def run(self):
          try:
              print "processing to estimate likelihood in 2D grids......!!!"
              start = time.time()
              pool = multiprocessing.Pool(processes=10)
              seq=[ self.y[i] for i in range( self.y.shape[0])]
              results=np.array( pool.map(self.ShearLikelihood, seq ))
              end = time.time()
              print "process time:\n",end - start
              pool.close()
          except ValueError:
              print "Oops! value error ....!"
          return results
      def plotLikelihood(self,shared_array):
          #plotting on a mesh the likelihood function in order to see whether you have defined the inputs correctly and you can observe the maximum likelihood in 2D
          # Set up a regular grid of interpolation points
          xi, yi = np.linspace(self.x.min(), self.x.max(), 100), np.linspace(self.y.min(), self.y.max(), 100)
          # Interpolate
          rbf = scipy.interpolate.interp2d(self.x, self.y,shared_array , kind='linear')
          zi = rbf(xi, yi)
          fig, ax = plt.subplots()
          divider = make_axes_locatable(ax)
          im = ax.imshow(zi, vmin=shared_array.min(), vmax=shared_array.max(), origin='lower',
                        extent=[self.x.min(), self.x.max(), self.y.min(),self.y.max()])
          ax.set_xlabel(r"$Xpos$")
          ax.set_ylabel(r"$Ypos$")
          ax.xaxis.set_label_position('top')
          ax.xaxis.set_tick_params(labeltop='on')
          cax = divider.append_axes("right", size="5%", pad=0.05)
          cbar = fig.colorbar(im,cax=cax, ticks=list(np.linspace(shared_array.max(), shared_array.min(),20)),format='$%.2f$')
          cbar.ax.tick_params(labelsize=8) 
          plt.savefig('/users/Desktop/MassRecons/Likelihood2d_XY_coordinate.pdf', transparent=True, bbox_inches='tight', pad_inches=0)
          plt.close()

if __name__ == '__main__':
     Xgal = np.linspace(Xgalaxy.min(), Xgalaxy.max(), 1000)
     Ygal = np.linspace(Ygalaxy.min(), Ygalaxy.max(), 1000)          
     Test=LikelihoodTest(Xgal,Ygal) 
     x=Test.run()
     Test.plotLikelihood(x)

Now it is working like a charm! :)

Dalek
  • 4,168
  • 11
  • 48
  • 100
  • This works because `pathos.multiprocessing` uses a more robust serializer. – Mike McKerns Aug 19 '14 at 18:21
  • @MikeMcKerns Do you know why I am getting [this error message](http://stackoverflow.com/questions/25389854/pathos-multiprocessing-raised-interrupted-system-call-message) – Dalek Aug 19 '14 at 18:28
  • your code does not run, as posted above. You haven't defined all the global variables. – Mike McKerns Aug 19 '14 at 18:42
  • @MikeMcKerns The answer code does work and even I got the plot but surprisingly it also raised an error message. – Dalek Aug 19 '14 at 18:46
  • Right, but what I mean, is I can't copy-paste and run your code as above. – Mike McKerns Aug 19 '14 at 18:47
  • @MikeMcKerns there is a function called `reduced_shear` that is very complicated and I didn't post it because It is very complicated with a lot of inputs. – Dalek Aug 19 '14 at 18:51
  • Debugging doesn't always require good physics. :) You could fake it. – Mike McKerns Aug 19 '14 at 18:53
  • @MikeMcKerns True but I am wondering whether it is just a bug in the `multiprocessing` library or I have made a mistake that might not crash the code this time. – Dalek Aug 19 '14 at 19:06
-3

You can't pass functions or methods to different processes using Pickle, but you can pass strings.

You can mantain a methods dictionary and refer to the methods through their string keys. That's not very elegant, but solves the problem.

EDIT: When you use multiprocessing, there is an implicit "fork". This creates multiple independent processes with no shared resources, because this, every thing you pass to another process must be serialized with Pickle. The problem is that pickle doesn't allow to serialize executable code to send it to another process.

castarco
  • 1,368
  • 2
  • 17
  • 33
  • It is a bit vague for me your answer. Before that I was passing `like2d` to the multiprocessing and it was working perfectly without being a method of a class. – Dalek Aug 19 '14 at 11:44
  • When you use multiprocessing, there is an implicit "fork". This creates multiple independent processes with no shared resources, because this, every thing you pass to another process must be serialized with Pickle. The problem is that pickle doesn't allow to serialize executable code to send it to another process. – castarco Aug 19 '14 at 11:47
  • That's not strictly true. See [this answer](http://stackoverflow.com/a/1816969/1142167) and [this answer](http://stackoverflow.com/a/8805244/1142167) as well as the [python docs on pickling](https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled) – Joel Cornett Aug 19 '14 at 12:38