3

I read an old question Why does this python multiprocessing script slow down after a while? and many others before posting this one. They do not answer the problem I'm having.

IDEA OF THE SCRIPT. The script generates arrays, 256x256, in a serialised loop. Elements of an array are calculated one-by-one from a list that contains dictionaries with relevant params, one dictionary per an array element (256x256 in total per a list). The list is the way for me to enable parallel calculations.

THE PROBLEM. In the beginning, the generation of the data speeds up from a dozen seconds up-to a few seconds. Then, after a few iterations, it starts slowing down a fraction of a second with each new array generated to the point it takes forever to calculate anything.

Additional info.

  1. I am using a pool.map function. After making a few small changes to identify which element is being calculated, I also tried using map_async. Unfortunately, it is slower because I need to init the pool each time I finish calculating an array.
  2. When using the pool.map, I init the pool once before anything starts. In this way, I hope to save time initializing the pool in comparison to map_async.
  3. CPU shows low usage, up to ~18%.
  4. In my instance, a hard-drive isn't a bottleneck. All the data necessary for calculations is in RAM. I also do not save data onto a hard-drive keeping everything in RAM.
  5. I also checked if the problem persists if I use a different number of cores, 2-24. No changes either.
  6. I made some additional tests by running and terminating a pool, a. each time an array is generated, b. every 10 arrays. I noticed that in each case execution of the code slows down compared to the previous pool's execution time, i.e. if the previous slowed down to 5s, another one will be 5.Xs and so on. The only time the execution doesn't slow down is when I run the code serially.
  7. Working env: Windows 10, Python 3.7, conda 4.8.2, Spyder 4.

THE QUESTION: Why multiprocessing slows down after a while in the case where only CPU & RAM are involved (no hard-drive slowdown)? Any idea?


UPDATED CODE:

import multiprocessing as mp 
from tqdm import tqdm
import numpy as np
import random

def wrapper_(arg):
    return tmp.generate_array_elements(
        self=arg['self'], 
        nu1=arg['nu1'], 
        nu2=arg['nu2'], 
        innt=arg['innt'], 
        nu1exp=arg['nu1exp'], 
        nu2exp=arg['nu2exp'], 
        ii=arg['ii'], 
        jj=arg['jj'],
        llp=arg['self'].llp, 
        rr=arg['self'].rr, 
    )

class tmp:
    def __init__(self, multiprocessing, length, n_of_arrays):
        self.multiprocessing = multiprocessing
        self.inshape = (length,length)
        self.length = length
        self.ll_len = n_of_arrays
        self.num_cpus = 8
        self.maxtasksperchild = 10000
        self.rr = 0
        
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_ll(self, lp): 
        return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]
    
    """original function is different, modified to return something"""
    def get_ip(self): return np.random.random()
    
    """original function is different, modified to return something"""
    def get_op(self): return np.random.random(self.length)
    
    """original function is different, modified to return something"""    
    def get_innt(self, nu1, nu2, ip):
        return nu1*nu2/ip
    
    """original function is different, modified to return something"""    
    def __get_pp(self, nu1):
        return np.exp(nu1)
    
    """dummy function for the example purpose"""
    def dummy_function(self):
        """do important stuff"""
        return 
    
    """dummy function for the example purpose"""
    def dummy_function_2(self, result):
        """do important stuff"""
        return np.reshape(result, np.inshape)
    
    """dummy function for the example purpose"""
    def dummy_function_3(self):
        """do important stuff"""
        return
    
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_llp(self, ll, lp):
        return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]
        
    """NOTE, lp is not used here for the example purpose but
    in the original code, it's very important variable containg
    relevant data for calculations"""
    def generate(self, lp={}):
        """create a list that is used to the creation of 2-D array"""
        """providing here a dummy pp param to get_ll"""
        ll = self.get_ll(lp)
        ip = self.get_ip()
        
        self.op = self.get_op()
        
        """length of args_tmp = self.length * self.length = 256 * 256"""
        args_tmp = [
            {'self': self, 
             'nu1': nu1,  
             'nu2': nu2, 
             'ii': ii, 
             'jj': jj,
             'innt': np.abs(self.get_innt(nu1, nu2, ip)),
             'nu1exp': np.exp(1j*nu1*ip),
             'nu2exp': np.exp(1j*nu2*ip),
             } for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]
        
        pool = {}
        if self.multiprocessing: 
            pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)
        
        """number of arrays is equal to len of ll, here 300"""
        for ll_ in tqdm(ll):
            """Generate data"""
            self.__generate(ll_, lp, pool, args_tmp)
        
        """Create a pool of CPU threads"""
        if self.multiprocessing: 
            pool.terminate()

    def __generate(self, ll, lp, pool = {}, args_tmp = []):
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function()
        
        self.llp = self.get_llp(ll, lp)
        """originally the values is taken from lp"""
        self.rr = self.rr
        
        if self.multiprocessing and pool: 
            result = pool.map(wrapper_, args_tmp)
        else: 
            result = [wrapper_(arg) for arg in args_tmp]
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        result = self.dummy_function_2(result)
    
    """original function is different"""
    def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
        if rr == 1 and self.inshape[0] - 1 - jj < ii: 
            return 0
        elif rr == -1 and ii > jj: 
            return 0
        elif rr == 0:
            """do nothing"""
        
        ll1 = []
        ll2 = []
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function_3()
        
        for kk, ll in enumerate(llp):
            ll1.append(
               self.__get_pp(nu1) * 
               nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
            )
            ll2.append(
               self.__get_pp(nu2) * 
               nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
            )
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result
    
    
    
g = tmp(False, 256, 300)
g.generate()
  • 1
    Can you post some example code? How long is the list/array? – Ian Wilson Jul 13 '20 at 04:08
  • 1
    See how to create a [mcve]. – Peter Wood Jul 13 '20 at 04:10
  • The list is 256x256=65536 elements long for each array. What makes the array generation so long is the math behind everything and a random of elements in one of the dictionary's param. I'll need to compress several class methods into a simple example. I'll try to post a minimal example ASAP. – Krzysztof Maliszewski Jul 13 '20 at 04:40
  • I added a minimal code to the question. It's the exact representation of my code in terms of hierarchy and execution, but, for some reason, it freezes for me on Windows. If you have the same problem, let me know and I'll check what can be done. You can turn parallel computing on and off to compare times. – Krzysztof Maliszewski Jul 13 '20 at 05:37
  • PS. The freezing problem happens to me with multiprocessing from time to time occasionally (sometimes it works, other times it doesn't) so it might not be the case for you. Again, let me know if you happen to have it too. – Krzysztof Maliszewski Jul 13 '20 at 05:44
  • Added point 6. to the Additional info. – Krzysztof Maliszewski Jul 13 '20 at 06:19
  • I noticed a few odd things in your code when making my answer, you don't use `ii` in generate_2, therefore not using the contents of each `ip` in `ip_list`. Is that intentional? – Ian Wilson Jul 14 '20 at 04:17
  • Also, it seems that there are 300*256 calls of generate_parallel, and not 256*256 since ip_list is range(300). Is that intentional? – Ian Wilson Jul 14 '20 at 04:28
  • Ian, you misread the code. There are 2 crucial variables: ip_list, a list of lists of random length (in the code there're 300 elements inside); args_tmp which is built from a certain list with len of 256 using list comprehension on two loops across the list, so args_tmp has a len of 256*256. Then, there is a loop across ip_list which calls generate_2 function with args_tmp parameter and generate_2 makes a pool.map call on wrapper_ with args_tmp argument which calls generate_array_elements. Hope that clarifies the number of calls made by the code :). – Krzysztof Maliszewski Jul 15 '20 at 00:39

2 Answers2

3

It is hard to tell what is going on in your algorithm. I don't know a lot about multiprocessing but it is probably safer to stick with functions and avoid passing self down into the pooled processes. This is done when you pass args_tmp to wrapper_ in pool.map(). Also overall, try to reduce how much data is passed between the parent and child processes in general. I try to move the generation of the lp list into the pool workers to prevent passing excessive data.

Lastly, altough I don't think it matters in this example code but you should be either cleaning up after using pool or using pool with with.

I rewrote some of your code to try things out and this seems faster but I'm not 100% it adheres to your algorithm. Some of the variable names are hard to distinguish.

This runs a lot faster for me but it is hard to tell if it is producing your solutions accurately. My final conclusion if this is accurate is that the extra data passing was significantly slowing down the pool workers.

#main.py
if __name__ == '__main__':
    import os
    import sys
    file_dir = os.path.dirname(__file__)
    sys.path.append(file_dir)

    from tmp import generate_1
    parallel = True
    generate_1(parallel)


#tmp.py
import multiprocessing as mp 
import numpy as np
import random
from tqdm import tqdm
from itertools import starmap

def wrapper_(arg):
    return arg['self'].generate_array_elements(
        nu1=arg['nu1'],
        nu2=arg['nu2'],
        ii=arg['ii'],
        jj=arg['jj'],
        lp=arg['self'].lp,
        nu1exp=arg['nu1exp'],
        nu2exp=arg['nu2exp'],
        innt=arg['innt']
    )

def generate_1(parallel):
    """create a list that is used to the creation of 2-D array"""
    il = np.random.random(256)
    """generating params for parallel data generation"""
    """some params are also calculated here to speed up the calculation process
    because they are always the same so they can be calculated just once"""
    """this code creates a list of 256*256 elements"""
    args_tmp = [
    {
     'nu1': nu1,  
     'nu2': nu2, 
     'ii': ii, 
     'jj': jj,
     'innt': np.random.random()*nu1+np.random.random()*nu2,
     'nu1exp': np.exp(1j*nu1),
     'nu2exp': np.exp(1j*nu2),
    } for ii, nu1 in enumerate(il) for jj, nu2 in enumerate(il)]

    """init pool"""
    

    """get list of arrays to generate"""
    ip_list = [random.sample((range(256)),int(np.random.random()*12)+1) for ii in range(300)]

    map_args = [(idx, ip, args_tmp) for idx, ip in enumerate(ip_list)]
    """separate function to do other important things"""
    if parallel:
        with mp.Pool(8, maxtasksperchild=10000) as pool:
            result = pool.starmap(start_generate_2, map_args)
    else:
        result = starmap(start_generate_2, map_args)
    # Wrap iterator in list call.
    return list(result)

def start_generate_2(idx, ip, args_tmp):
    print ('starting {idx}'.format(idx=idx))
    runner = Runner()
    result = runner.generate_2(ip, args_tmp)
    print ('finished {idx}'.format(idx=idx))
    return result

class Runner():

    def generate_2(self, ip, args_tmp):
        """NOTE, the method is much more extensive and uses other methods of the class""" 
        """so it must remain a method of the class that is not static!"""
        self.lp = [{'a': np.random.random(), 'b': np.random.random()} for ii in ip]
        """this part creates 1-D array of the length of args_tmp, that's 256*256"""
        result = map(wrapper_, [dict(args, self=self) for args in args_tmp])
        """it's then reshaped to 2-D array"""
        result = np.reshape(list(result), (256,256))
        return result
    
    def generate_array_elements(self, nu1, nu2, ii, jj, lp, nu1exp, nu2exp, innt):
        """doing heavy calc"""
        """"here is something else"""
        if ii > jj: return 0
            
        ll1 = []
        ll2 = []
        for kk, ll in enumerate(lp):
            ll1.append(nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random()))
            ll2.append(nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random()))
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result


I'm adding a generic template to show an architecture where you would split the preparation of the shared args away from the task runner and still use classes. The strategy here would be do not create too many tasks(300 seems faster than trying to split them down to 64000), and don't pass too much data to each task. The interface of launch_task should be kept as simple as possible, which in my refactoring of your code would be equivalent to start_generate_2.

import multiprocessing
from itertools import starmap


class Launcher():
    def __init__(self, parallel):
        self.parallel = parallel

    def generate_shared_args(self):
        return [(i, j) for i, j in enumerate(range(300))]

    def launch(self):
        shared_args = self.generate_shared_args()
        if self.parallel:
            with multiprocessing.Pool(8) as pool:
                result = pool.starmap(launch_task, shared_args)
        else:
            result = starmap(launch_task, shared_args)
        # Wrap in list to resolve iterable.
        return list(result)


def launch_task(i, j):
    task = Task(i, j)
    return task.run()


class Task():

    def __init__(self, i, j):
        self.i = i
        self.j = j

    def run(self):
        return self.i + self.j


if __name__ == '__main__':
    parallel = True
    launcher = Launcher(parallel)
    print(launcher.launch())

There is a warning about the cleanup of pool in the pool documentation here: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

The first item discusses avoiding shared state and specifically large amounts of data. https://docs.python.org/3/library/multiprocessing.html#programming-guidelines

Ian Wilson
  • 6,223
  • 1
  • 16
  • 24
  • Thank you. I'll check it tomorrow my time. I pass 'self' because inside generate_parallel function I use another class' method and variable. I'll pass the variable as you did with lp and I'll make the method static. The latter I'll also do with generate_parallel method. At this moment to generate ~66k arrays it takes several days what is VERY painful. Hopefully, that will help. I'll let you know. – Krzysztof Maliszewski Jul 14 '20 at 11:15
  • Hey Ian. Yesterday I haven't noticed that you got rid off the class structure, leaving just functions. The problem is, generate_2 cannot be made a function outside of the class nor it can anyhow be made a static method because it must have the 'self' argument. At the moment I'm trying to pass the self argument in map_args for starmap without success. Can you change your code so it has a structure containing a class as in the original code? – Krzysztof Maliszewski Jul 14 '20 at 23:23
  • I removed the class because I stopped passing self. It didn't seem necessary and seems like it would be a source of bugs when using pool. How come you need to pass self? Is it in more extensive code not included in your question? – Ian Wilson Jul 15 '20 at 00:07
  • Yes, method generate_2 is much more extensive and calculates things using the class' methods. Moreover, it allows to turn on/off parallelism with just a simple variable value change so the same code isn't copied twice. I like the idea of calling pool on generate_2 and, then, on each element of an array. I think it should work. If you could help me arrange the code in a class manner, I'd be grateful. I'm still trying to do that on my own. – Krzysztof Maliszewski Jul 15 '20 at 00:23
  • I made a comment for you below the original post. – Krzysztof Maliszewski Jul 15 '20 at 00:40
  • @KrzysztofMaliszewski I dropped the class to avoid splitting the instance between the parent and all the workers in the pool because I think that is going to cause overhead and bugs. The only purpose in your example code for self is to carry `lp` into the wrapper call. Why not just pass it directly? Also note that `self` is not used in `generate_elements` at all. – Ian Wilson Jul 15 '20 at 01:48
  • in the original post I updated the code to better represent what happens in my original code. The class is large so I hope the update will make it clearer why the code works as it does now. I agree with you that some instances of self could be (re)moved. I made some change in my code (not shown in the post) to make it happen, i.e. what you suggested with lp, I also made generate_elements method static as well as put it outside of the class but, for unknown reasons, the code runs even slower than before even with multiprocessing off. – Krzysztof Maliszewski Jul 15 '20 at 02:58
  • I think you might be right in term of overhead and bugs with pool and parenting causing the app to slow down with each new array. I will be further trying to find the solution to the problem with some ideas you proposed so far. – Krzysztof Maliszewski Jul 15 '20 at 02:59
  • @KrzysztofMaliszewski I updated code so that it maps to a function that creates a class instance within each task, instead of splitting it between parent and children, it is sort of a compromise. I pass the parallel flag into generate_2 which remains a regular function. Note that starmap for parallel = False is from itertools. – Ian Wilson Jul 15 '20 at 03:03
  • @KrzysztofMaliszewski Added a minimal example with an architecture that might be more clear if you must use classes for the prep and running steps. – Ian Wilson Jul 15 '20 at 03:25
  • Ok, thank you. I'll have a look and try to implement my code according to your tips. I'll post an answer if I find it. Kind regards! – Krzysztof Maliszewski Jul 15 '20 at 04:12
  • Finally! Your tip about which function should be paralleled is a success. So instead of calling pool.map on "generate_elements", I am doing it on "generate_2" calculating each element of an array serially (no parallelism here). Now instead of having 7-15s to calculate an array, it takes 1.5it/s - 2s at most. ~66k arrays are generated in 0.5 day now. I'll post my code with the change soon but I'm accepting your post as an answer. Thank you, again, a lot, Ian. – Krzysztof Maliszewski Jul 15 '20 at 08:09
2

Ian Wilson's suggestions were very helpful and one them helped to resolve the issue. That's why his answer is marked as the correct one.

As he suggested, it's better to call pool on a smaller number of tasks. So instead of calling pool.map for each array (N) that is created 256*256 times for each array's element (so N*256*256 tasks in total), now I call pool.map on the function that calculates the whole array so just N times. The array calculation inside the function is done in a serialised way.

I'm still sending self as a param because it's needed in the function but it doesn't have any impact on the performance.

That small change speeds-up a calculation of an array from 7-15s up to 1.5it/s-2s/it!

CURRENT CODE:

import multiprocessing as mp 
import tqdm
import numpy as np
import random

def wrapper_(arg):
    return tmp.generate_array_elements(
        self=arg['self'], 
        nu1=arg['nu1'], 
        nu2=arg['nu2'], 
        innt=arg['innt'], 
        nu1exp=arg['nu1exp'], 
        nu2exp=arg['nu2exp'], 
        ii=arg['ii'], 
        jj=arg['jj'],
        llp=arg['self'].llp, 
        rr=arg['self'].rr, 
    )

"""NEW WRAPPER HERE"""
"""Sending self doesn't have bad impact on the performance, at least I don't complain :)"""
def generate(arg):
   tmp._tmp__generate(arg['self'], arg['ll'], arg['lp'], arg['pool'], arg['args_tmp'])

class tmp:
    def __init__(self, multiprocessing, length, n_of_arrays):
        self.multiprocessing = multiprocessing
        self.inshape = (length,length)
        self.length = length
        self.ll_len = n_of_arrays
        self.num_cpus = 8
        self.maxtasksperchild = 10000
        self.rr = 0
        
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_ll(self, lp): 
        return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]
    
    """original function is different, modified to return something"""
    def get_ip(self): return np.random.random()
    
    """original function is different, modified to return something"""
    def get_op(self): return np.random.random(self.length)
    
    """original function is different, modified to return something"""    
    def get_innt(self, nu1, nu2, ip):
        return nu1*nu2/ip
    
    """original function is different, modified to return something"""    
    def __get_pp(self, nu1):
        return np.exp(nu1)
    
    """dummy function for the example purpose"""
    def dummy_function(self):
        """do important stuff"""
        return 
    
    """dummy function for the example purpose"""
    def dummy_function_2(self, result):
        """do important stuff"""
        return np.reshape(result, np.inshape)
    
    """dummy function for the example purpose"""
    def dummy_function_3(self):
        """do important stuff"""
        return
    
    """original function is different, modified to return something"""
    """for the example purpose, lp is not relevant here but in general is"""
    def get_llp(self, ll, lp):
        return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]
        
    """NOTE, lp is not used here for the example purpose but
    in the original code, it's very important variable containg
    relevant data for calculations"""
    def generate(self, lp={}):
        """create a list that is used to the creation of 2-D array"""
        """providing here a dummy pp param to get_ll"""
        ll = self.get_ll(lp)
        ip = self.get_ip()
        
        self.op = self.get_op()
        
        """length of args_tmp = self.length * self.length = 256 * 256"""
        args_tmp = [
            {'self': self, 
             'nu1': nu1,  
             'nu2': nu2, 
             'ii': ii, 
             'jj': jj,
             'innt': np.abs(self.get_innt(nu1, nu2, ip)),
             'nu1exp': np.exp(1j*nu1*ip),
             'nu2exp': np.exp(1j*nu2*ip),
             } for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]
        
        pool = {}
        
        """MAJOR CHANGE IN THIS PART AND BELOW"""
        map_args = [{'self': self, 'idx': (idx, len(ll)), 'll': ll, 'lp': lp, 'pool': pool, 'args_tmp': args_tmp} for idx, ll in enumerate(ll)]

        if self.multiprocessing: 
            pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)
            
            for _ in tqdm.tqdm(pool.imap_unordered(generate_js_, map_args), total=len(map_args)):
                pass
            pool.close()
            pool.join()
            pbar.close()
        else:
            for map_arg in tqdm.tqdm(map_args):
                generate_js_(map_arg)

    def __generate(self, ll, lp, pool = {}, args_tmp = []):
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function()
        
        self.llp = self.get_llp(ll, lp)
        """originally the values is taken from lp"""
        self.rr = self.rr
        
        """REMOVED PARALLEL CALL HERE"""
        result = [wrapper_(arg) for arg in args_tmp]
        
        """In the original code there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        result = self.dummy_function_2(result)
    
    """original function is different"""
    def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
        if rr == 1 and self.inshape[0] - 1 - jj < ii: 
            return 0
        elif rr == -1 and ii > jj: 
            return 0
        elif rr == 0:
            """do nothing"""
        
        ll1 = []
        ll2 = []
        
        """In the original code, there are plenty other things done in the code
        using class' methods, they are not shown here for the example purpose"""
        self.dummy_function_3()
        
        for kk, ll in enumerate(llp):
            ll1.append(
               self.__get_pp(nu1) * 
               nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
            )
            ll2.append(
               self.__get_pp(nu2) * 
               nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
            )
            
        t1 = sum(ll1)
        t2 = sum(ll2)
        result = innt*np.abs(t1 - t2)
        return result
    
    
    
g = tmp(False, 256, 300)
g.generate()

Thank you Ian, again.

raylu
  • 2,630
  • 3
  • 17
  • 23