1

I am trying to parallelize a function that takes multiple constant arguments. So far I have been able to run it, but it is not parallelizing the process. How should I approach it?

I tried to do the following:

import numpy as np
import multiprocessing 


def optm(hstep,astep,time_ERA):

    #this is a secondary function where I get arrays from a dataset
    data = checkdate(time_ERA,2,4)
    zlevels=data[0] 
    pottemp=data[1] 


    for z1 in np.linspace(0,zlevels[-1],hstep):
        for z2 in np.linspace(0,zlevels[-1],hstep):
            for a1 in np.linspace(0,0.01,astep): # max angle
                for a2 in np.linspace(0,0.01,astep):
                    for a3 in np.linspace(0,0.01,astep):
                       result_array=another_function(zlevels,pottemp,z1,z2,a1,a2,a3) # this function is the one that does all the math in the code. Therefore, it take a lot of time to compute it.

    return result_array

Then I parallelized the function this way:

input_list = [(hstep,astep,time_ERA)] #creat a tuple for the necessary startmap 

pool = multiprocessing.Pool()
result = pool.starmap(optm, input_list)
pool.close()

When I run it, it takes longer than without the parallelization. It is my first time trying to parallelize a code so I am still not sure if I should use map or starmap and how to parallelize it.

Murali
  • 364
  • 2
  • 11
Michell
  • 33
  • 6
  • `[(hstep,astep,time_ERA)]` is one input. Can’t really run one function call in parallel. – Ry- Aug 21 '19 at 08:48
  • you have to give number of processes to create to the Pool class, otherwise it takes default 1 process. check the number physical processes in your machine using psutil, give twice as that number of processes to create – Murali Aug 21 '19 at 08:48
  • you should parallelize `another_function` instead of `optm`. I would create a multiprocessing queue, and instead of the function call, append jobs to the queue. Then launch `x` processes, where `x` is `multiprocessing.cpu_count` and feed the queue to each process. Inside each process call the function with the given params. – Dschoni Aug 21 '19 at 08:53
  • @Dschoni should I parallelize ```another_function``` inside of ```optm```? – Michell Aug 21 '19 at 08:56
  • You can, but you don't have to. Look at this example (https://stackoverflow.com/questions/20887555/dead-simple-example-of-using-multiprocessing-queue-pool-and-locking) for using pools. You could also use the `apply_async` method of a pool instead. I can answer with a detailed example if needed. – Dschoni Aug 21 '19 at 09:00
  • @Dschoni if you could show me with an example, it'd be really helpful. It's my first time using multiprocessing – Michell Aug 21 '19 at 09:06
  • 1
    BTW: You override `result_array` in each iteration. – Dschoni Aug 21 '19 at 09:09
  • 1
    note: using [vectorising/broadcasting in numpy](https://docs.scipy.org/doc/numpy/user/theory.broadcasting.html) will probably be at least a couple of orders of magnitude faster than multiprocessing. but it would depend on details `another_function` (and might require altering it) so could be a more invasive change. that said, if you're after performance of numeric code vectorisation is generally the first place to look – Sam Mason Aug 21 '19 at 11:41
  • Adding to @SamMason that OP is probably asking an x-y question. Vectorisation is indeed usually orders of magnitudes faster than iteration in loops. Can you show the implementation of `another_function`? – Dschoni Aug 21 '19 at 15:06

1 Answers1

0

Using the minimal example from my comment adapted to your problem:

import multiprocessing
import time
import numpy as np

def optm(hstep,astep,time_ERA):
    values = []
    #this is a secondary function where I get arrays from a dataset
    data = checkdate(time_ERA,2,4)
    zlevels=data[0] 
    pottemp=data[1] 
    for z1 in np.linspace(0,zlevels[-1],hstep):
        for z2 in np.linspace(0,zlevels[-1],hstep):
            for a1 in np.linspace(0,0.01,astep): # max angle
                for a2 in np.linspace(0,0.01,astep):
                    for a3 in np.linspace(0,0.01,astep):
                        values.append([zlevels,pottemp,z1,z2,a1,a2,a3])
    return values

def mp_worker((zlevels,pottempt,z1,z2,a1,a2,a3)):
    temp = another_function(zlevels,pottemp,z1,z2,a1,a2,a3)
    # do something with the result

def mp_handler(data):
    p = multiprocessing.Pool(2) # Change 2 to your cpu count
    p.map(mp_worker, data)

if __name__ == '__main__':
    data = optm(hstep,astep,time_ERA) 
    mp_handler(data)

Instead of mapping, you could do pool.apply_async() with each set of parameters, or use a multiprocessing queue to feed jobs to subprocesses. I assume, that the output needs to be stored in one single array, so Queues will make that a lot easier. You could feed jobs to a queue and push the results to another queue and when all processes are done collect the results from the result queue in the main thread and store them to an array.

Dschoni
  • 3,714
  • 6
  • 45
  • 80
  • I think you forgot to return processed data from mp_handler – kederrac Aug 21 '19 at 09:39
  • 1
    Also, ```data.append([...])```returns this error ```AttributeError: 'tuple' object has no attribute 'append'```. I'm trying to fix it and I will post once I figure it out – Michell Aug 21 '19 at 09:57
  • @rusu_ro1 see my comment in the code. Result must be handled inside `mp_worker`. – Dschoni Aug 21 '19 at 10:13
  • @Dschoni that's the closest answer I have until now, but when I try to append the data it is still returning the same error. I believe it will be difficult to append it because ```zlevels``` and ```pottemp``` are arrays while ```z1,z2,a1,a2 and a3``` are the same size. – Michell Aug 21 '19 at 10:57
  • It's because I am stupid. You have declared data yourself after my initialisation. I'll edit again. – Dschoni Aug 21 '19 at 14:57