5

the code shown here are simplied but triggers the same PicklingError. I know there is a lot discussion on what can and cannot be pickled, but I did find the solution from them.

I write a simple cython script with the following function:

def pow2(int a) : 
    return a**2 

The compilation is working, I can call this function in python script.

enter image description here

However, I am wondering how to use this function with multiprocessing,

from multiprocessing import Pool
from fast import pow2
p = Pool(processes =4 )
y = p.map( pow2, np.arange( 10, dtype=int))

gives me an PicklingError: enter image description here

dtw is the name of the package, and fast is fast.pyx.

How can I get around this problem? Thanks in advance

Saullo G. P. Castro
  • 56,802
  • 26
  • 179
  • 234
fast tooth
  • 2,317
  • 4
  • 25
  • 34
  • turns out I need to simply wrap around the cython function. If you have similar problem, see: http://stackoverflow.com/questions/4827432/how-to-let-pool-map-take-a-lambda-function – fast tooth Jun 10 '14 at 14:45

1 Answers1

6

Instead of using multiprocessing, which implies writting data on disk due to the pickling process you can use the OpenMP wrapper prange. In your case you could use it like shown below.

  • note the use of x*x instead of x**2, avoiding the function call pow(x, 2)):
  • a part of the array is passed to each thread, using double pointers
  • the last thread takes more values when size % num_threads != 0

Code:

#cython: wraparound=False
#cython: boundscheck=False
#cython: cdivision=True
#cython: nonecheck=False
#cython: profile=False
import numpy as np
cimport numpy as np
from cython.parallel import prange

cdef void cpow2(int size, double *inp, double *out) nogil:
    cdef int i
    for i in range(size):
        out[i] = inp[i]*inp[i]

def pow2(np.ndarray[np.float64_t, ndim=1] inp,
         np.ndarray[np.float64_t, ndim=1] out,
         int num_threads=4):
    cdef int thread
    cdef np.ndarray[np.int32_t, ndim=1] sub_sizes, pos
    size = np.shape(inp)[0]
    sub_sizes = np.zeros(num_threads, np.int32) + size//num_threads
    pos = np.zeros(num_threads, np.int32)
    sub_sizes[num_threads-1] += size % num_threads
    pos[1:] = np.cumsum(sub_sizes)[:num_threads-1]
    for thread in prange(num_threads, nogil=True, chunksize=1,
                         num_threads=num_threads, schedule='static'):
        cpow2(sub_sizes[thread], &inp[pos[thread]], &out[pos[thread]])

def main():
    a = np.arange(642312323).astype(np.float64)
    pow2(a, out=a, num_threads=4)
Saullo G. P. Castro
  • 56,802
  • 26
  • 179
  • 234
  • thanks for your answer, I tried that. prange doesn't work for me, because I use numpy style slicing in the loop, which creates a python object, the GIL has to be applied. My input is actually a list of unequal length vectors, and I put them in a big numpy 2d array filled with zeros for missing values. and the second input is a int array that indicates the length of each vector in the first parameter. Is there a better way ? Thanks – fast tooth Jun 10 '14 at 14:49
  • @fasttooth, but the input you described should be no problem.... when you put them in the big `2d-array` you can use the pointers as well, but in this case the pointer will be `&a[row, col]`... – Saullo G. P. Castro Jun 10 '14 at 14:51
  • isn't &a[row, col] points the address of the (row, col) element ? Do i have to write a C-like loop to copy out from the 2d array to a local c-array? – fast tooth Jun 10 '14 at 14:57
  • @fasttooth `&a[row, col]` will give the address of an element, which is what you need, when you do `double *b; b = &a[row, col]`, `b` is a `double` buffer starting at element `(row, col)`, such that `b[0]==a[row, col]`; and `b[1]==a[row, col+1]` for `C` memory layout or `b[1]==a[row+1, col]` for `Fortran` memory layout – Saullo G. P. Castro Jun 10 '14 at 15:02