9

I am in the process of migrating from MATLAB to Python, mainly because of the vast number of interesting Machine Learning packages available in Python. But one of the issues which have been the source of confusion for me, is parallel processing. In particular, I want to read thousand of text files from disk in a for loop and I want to do it in parallel. In MATLAB, using parfor instead of for will do the trick, but so far I haven't been able to figure out how to do this in python. Here is an example of what I want to do. I want to read N text files, shape them into a N1xN2 array, and save each one into a a NxN1xN2 numpy array. And this array will be what I return from a function. Assuming the file names are file0001.dat, file0002.dat, etc., the code I like to parallelise is as follows:

import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
for counter in range(N):
    t_str="%.4d" % counter        
    filename = 'file_'+t_str+'.dat'
    temp_array = np.loadtxt(filename)
    temp_array.shape=[N1,N2]
    result[counter,:,:]=temp_array

I run the codes on a cluster, so I can use many processors for the job. Hence, any comment on which of the parallelisation methods is more suitable for my task (if there are more than one) is most welcome.

NOTE: I am aware of this post, but in that post, there are only out1, out2, out3 variables to worry about, and they have been used explicitly as arguments of a function to be parallelised. But here, I have many 2D arrays that should be read from file and saved into a 3D array. So, the answer to that question is not general enough for my case (or that is how I understood it).

Community
  • 1
  • 1
Vahid S. Bokharaie
  • 937
  • 1
  • 9
  • 25
  • I think you can achieve what you want using this http://stackoverflow.com/a/9786266/1040597 – Saeid Jun 19 '15 at 12:35
  • In that post, it is assumed the `for` loop merely executes a function called `calc_stuff`. But is there a way to parallelisd the `for` loop without defining a whole new function which incorporates what the for loop is supposed to do? – Vahid S. Bokharaie Jun 19 '15 at 12:47
  • I don't know if there is something exactly like what you want. But if you put the body of your 'for' loop in a function that receives counter as an input and do exactly as the mentioned post says you can get what you want. – Saeid Jun 19 '15 at 12:53

2 Answers2

5

You still probably want to use multiprocessing, just structure it a bit differently:

from multiprocessing import Pool

import numpy as np

N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])

filenames = ('file_%.4d.dat' % i for i in range(N))
myshaper = lambda fname: np.loadtxt(fname).reshape([N1, nN2])

pool = Pool()    
for i, temparray in enumerate(pool.imap(myshaper, filenames)):
    result[i, :, :] = temp_array
pool.close()
pool.join()

What this does is first get a generator for the file names in filenames. This means the file names are not stored in memory, but you can still loop over them. Next, it create a lambda function (equivalent to anonymous functions in matlab) that loads and reshapes a file (you could also use an ordinary function). Then it applies that function to each file name in using multiple processes, and puts the result in the overall array. Then it closes the processes.

This version uses some more idiomatic python. However, an approach that is more similar to your original one (although less idiomatic) might help you understand a bit better:

from multiprocessing import Pool

import numpy as np

N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])

def proccounter(counter):
    t_str="%.4d" % counter        
    filename = 'file_'+t_str+'.dat'
    temp_array = np.loadtxt(filename)
    temp_array.shape=[N1,N2]
    return counter, temp_array

pool = Pool()
for counter, temp_array in pool.imap(proccounter, range(N)):
    result[counter,:,:] = temp_array
pool.close()
pool.join()

This just splits most of your for loop into a function, applies that function to each element of the range using multiple processors, then puts the result into the array. It is basically just your original function with the for loop split into two for loops.

TheBlackCat
  • 9,791
  • 3
  • 24
  • 31
  • Thanks for your answer. I tries your method (the less idiomatic one), and I got the same error: PicklingError: Can't pickle : attribute lookup __builtin__.function failed I looked it up and as I understood I should have this parallelised code at the beginning of a function, or make the for loop incorporated in a new function. And this is something I am trying to avoid. Do you have any idea on how to get rid of this error? – Vahid S. Bokharaie Jun 23 '15 at 09:34
  • The for loop is in a function, in its 70th line. But I have given up trying to parallelise the code. It took apprx. an hour for me to run it to read three set of files that I had. And the time I have wasted on trying to parallelise it is already more than the potential gain for my other cases. – Vahid S. Bokharaie Jul 03 '15 at 09:41
4

It can be done using joblib library as follows:

def par_func(N1, N2, counter):
    import numpy as np
    t_str="%.4d" % counter   
    filename = 'file_'+t_str+'.dat'
    temp_array = np.loadtxt(filename)
    temp_array.shape=[N1,N2]
    # temp_array = np.random.randn(N1, N2)  # use this line to test
    return temp_array

if __name__ == '__main__':
    import numpy as np

    N=1000
    N1=200
    N2=100

    from joblib import Parallel, delayed
    num_jobs = 2
    output_list = Parallel(n_jobs=num_jobs)(delayed(par_func) 
                                            (N1, N2, counter)
                                            for counter in range(N)) 

    output_array = np.array(output_list)
Vahid S. Bokharaie
  • 937
  • 1
  • 9
  • 25