3

My problem is to execute something like :

multicore_apply(serie, func)

to run

So I tried to create a function doing it :

function used to run the apply method in a process :

def adaptator(func, queue) :
    serie = queue.get().apply(func)
    queue.put(serie)    

the process management :

def parallel_apply(ncores, func, serie) :
    series = [serie[i::ncores] for i in range(ncores)]
    queues =[Queue() for i in range(ncores)]
    for _serie, queue in zip(series, queues) :
        queue.put(_serie)
    result = []
    jobs = []    
    for i in range(ncores) :
        jobs.append(process(target = adaptator, args = (func, queues[i])))
    for job in jobs :
        job.start()
    for queue, job in zip(queues, jobs) :
        job.join()
        result.append(queue.get())
    return pd.concat(result, axis = 0).sort_index()

I know the i::ncores is not optimized but actually it's not the problem : if the input len is greater than 30000 the processes never stop...

Is that a misunderstanding of Queue()?

I don't want to use multiprocessing.map : the func to apply is a method from a class very complex and with a pretty big size, so shared memory make it just too slow. Here I want to pass it in a queue when the problem of process will be solved.

Thank you for your advices

Community
  • 1
  • 1
Jean-Baptiste F.
  • 308
  • 1
  • 10
  • What do you mean by "*I know the i::ncores is not optimized but actually it's not the problem : if the input len is greater than 30000 the processes never stop..*"? What is input len, `ncores`? What do you think is the time and space complexity of your code. Does 29999 work, or did you jump from 10k to 30k, and just suspect it doesn't finish? You have problem with scaling, and don't provide any data to make any suggestion. – luk32 Jun 06 '16 at 12:22
  • It's not very smart to use the selection i::ncores because i need to sort to get ordered results, it would be better to do it bloc by bloc. The input len is the lenght of the serie. I know it crashes at 30000 because i done it step by step of 500. ncores is the number of concurrect processes to launch. – Jean-Baptiste F. Jun 06 '16 at 13:57
  • Important information : my OS is Linux - Open SUSE – Jean-Baptiste F. Jun 06 '16 at 14:15

2 Answers2

0

May be that will helps - you can use multiprocessing lib. Your multicore_apply(serie, func) should look like:

from multiprocessing import Pool
pool = Pool()
pool.map(func, series)
pool.terminate()

You can specify count of process to be created like this pool = Pool(6), by default it equals to count of cores on the machine.

George Petrov
  • 2,729
  • 1
  • 13
  • 20
  • Thank you, but my question was process-specific, I am already OK with pool.map and I made some fonctions to do it in association with numpy.vectorize. – Jean-Baptiste F. Jun 06 '16 at 14:07
0

After many nights of intense search, I solved the problem with a post on the python development website about the max size of an object in a queue : the problem was here. I used another post on stackoverflow found here :

then I done the following program, but not as efficient as expected for large objects. I will do the same available for every axis.

Note this version allows to use complex class as function argument, that I cannot do with pool.map

def adaptator(series, results, ns, i) :
    serie = series[i]
    func = ns.func
    result = serie.apply(func)
    results[i] = result

def parallel_apply(ncores, func, serie) :
    series = pd.np.array_split(serie, ncores, axis = 0)
    M = Manager()
    s_series = M.list()
    s_series.extend(series)
    results = M.list()
    results.extend([None]*ncores)
    ns = M.Namespace()
    ns.func = func
    jobs = []    
    for i in range(ncores) :
        jobs.append(process(target = adaptator, args = (s_series, results, ns, i)))
    for job in jobs :
        job.start()
    for job in jobs :
        job.join()
    print(results)

So if you put large objects between queues, Ipython freezes

Community
  • 1
  • 1
Jean-Baptiste F.
  • 308
  • 1
  • 10