0

I am completely new to python multiprocessing and a bit overwhelmbed by the vast amount of online resources, so I kinda wanna a more clear-cut approach from here. My code looks like the following: the two functions, forward and backward are very computaionally expensive. On my input dataset, each takes about 13mins. I want to compute the two matrices (forward and backward, see the 3rd and 4th line of code in decode() function) simultaneouly. I looked into some online tutorials and I was thinking I could use multiprocessing.process to do this. However, I am not sure how to retrieve the numpy array. I know there are things like Queue, Array, but these seem quite restrictive in their usage and doesn't seem to be suitable here. Thanks in advance! '''

def forward(self, emis):
    # Given the observed haplotype, compute its forward matrix
    f = np.full((self.n1+self.n2, self.numSNP), np.nan)
    # initialization
    f[:,0] = (-math.log(self.n1+self.n2) + emis[0]).flatten()

     # fill in forward matrix
    for j in range(1, self.numSNP):
        T = self.transition(self.D[j])
        # using axis=1, logsumexp sum over each column of the transition matrix
        f[:, j] = emis[j] + logsumexp(f[:,j-1][:,np.newaxis] + T, axis=0)
    return f


#@profile
def backward(self, emis):
    # Given the observed haplotype, compute its backward matrix
    b = np.full((self.n1+self.n2, self.numSNP), np.nan)
    # initialization
    b[:, self.numSNP-1] = np.full(self.n1+self.n2, 0)

    for j in range(self.numSNP-2, -1, -1):
        T = self.transition(self.D[j+1])
        b[:,j] = logsumexp(T + emis[j+1] + b[:,j+1], axis=1)
    return b


#@profile
def decode(self, obs):
    # infer hidden state of each SNP sites in the given haplotype
    # state[j] = 0 means site j was most likely copied from population 1 
    # and state[j] = 1 means site j was most likely copies from population 2

    start = time.time()
    emis = self.emissionALL(obs)
    f = self.forward(emis)
    b = self.backward(emis)
    end= time.time()
    print(f'uncached version takes time {end-start}')
    print(f'forward probability:{logsumexp(f[:,-1])}')
    print(f'backward probability:{logsumexp(-math.log(self.n1+self.n2)+emis[0]+b[:,0])}')
    return 0

'''

Yilei Huang
  • 77
  • 1
  • 7

1 Answers1

0

I am not sure what is restrictive about Array for multiprocessing if you are just using a matrix anyway. Its not complete, but this would be the idea.

from multiprocessing.sharedctypes import RawArray

#make some empty arrays 
yourMat = RawArray('d', X_size) 
resultMat = RawArray('d', X_size) 

...
ptemp=multiprocessing.Process(target=backward, args=(yourMat ,resultMat ))
ptemp.daemon=True
ptemp.start()

...

data = np.frombuffer(yourMat, dtype=np.float64)
#do something with data
resultMat [i:j] = data 

...

#get the data
results = np.frombuffer(resultMat , dtype='i')

You can check this post for a complete example: Use numpy array in shared memory for multiprocessing

user-2147482637
  • 2,115
  • 8
  • 35
  • 56