3

I have implemented multiprocessing for some problem with larger objects like the following:

import time
import pathos.multiprocessing as mp
from functools import partial
from random import randrange


class RandomNumber():
    def __init__(self, object_size=100):
        self.size = bytearray(object_size*10**6)  # 100 MB size
        self.foo = None

    def do_something(self, *args, **kwargs):
        self.foo = randrange(1, 10)
        time.sleep(0.5)  # wait for 0.5 seconds
        return self


def wrapper(random_number, *args, **kwargs):
    return random_number.do_something(*args, **kwargs)


if __name__ == '__main__':
    # create data
    numbers = [RandomNumber() for m in range(0, 9)]
    kwds = {'add': randrange(1, 10)}

    # calculate
    pool = mp.Pool(processes=mp.cpu_count())
    result = pool.map_async(partial(wrapper, **kwds), numbers)
    try:
        result = result.get()
    except:
        pass

    # print result
    my_results = [i.foo for i in result]
    print(my_results)

    pool.close()
    pool.join()

which yields something like:

[8, 7, 8, 3, 1, 2, 6, 4, 8]

Now the problem is that I have a massive improvement in performance compared to using a list comprehension when the objects are very small and this improvement turns into the opposite with larger object sizes e.g. 100 MB and larger.

From the documentation and other questions I have discovered that this caused by the use of pickle/dill for the serialization of single objects in order to pass them to the workers within the pool. In other words: the objects are copied and this IO operation becomes a bottleneck as it is more time consuming than the actual calculation.

I have alread tried to work on the same object using a multiprocessing.Manager but this resulted in even higher runtimes.

The problem is that I am bound to a specific class structure (here represented through RandomNumber()) which I cannot change..

Now my question is: Are there any ways or concepts to circumvent this behaviour and only get my calls on do_something() without the overhead of serialization or copying?

Any hints are welcome. Thanks in advance!

Cord Kaldemeyer
  • 6,405
  • 8
  • 51
  • 81

2 Answers2

-1

You need to use Batch processing.Do not create destroy workers for each number. Make limited workers based on cpu_count.Then pass a list to each worked and process them .Use map and pass a list containing batches of numbers.

vks
  • 67,027
  • 10
  • 91
  • 124
  • Could you provide a minimal example or a link on how to implement this? – Cord Kaldemeyer Jan 14 '20 at 10:02
  • @CordKaldemeyer I am away from computer....will try to give an idea....from your main list create sublists....now pass each sublist to map....in your function do while Len list and do something...once u run this...you will get optimal size of batch or sublist.... – vks Jan 14 '20 at 10:04
  • When in start running this....play with batchsize....u will eventually find the correct size for this task – vks Jan 14 '20 at 10:05
-2

I have found a solution using multiprocessing or multithreading from the concurrent.futures library which does not require to pickle the objects. In my case, multithreading using ThreadPoolExecutor brings a clear advantage over multiprocessing via ProcessPoolExecutor.

import time
from random import randrange
import concurrent.futures as cf


class RandomNumber():
    def __init__(self, object_size=100):
        self.size = bytearray(object_size*10**6)  # 100 MB size
        self.foo = None

    def do_something(self, *args, **kwargs):
        self.foo = randrange(1, 10)
        time.sleep(0.5)  # wait for 0.5 seconds
        return self


def wrapper(random_number, *args, **kwargs):
    return random_number.do_something(*args, **kwargs)


if __name__ == '__main__':
    # create data
    numbers = [RandomNumber() for m in range(0, 100)]
    kwds = {'add': randrange(1, 10)}

    # run
    with cf.ThreadPoolExecutor(max_workers=3) as executor:
        result = executor.map(wrapper, numbers, timeout=5*60)

    # print result
    my_results = [i.foo for i in result]
    print(my_results)

yields:

[3, 3, 1, 1, 3, 7, 7, 6, 7, 5, 9, 5, 6, 5, 6, 9, 1, 5, 1, 7, 5, 3, 6, 2, 9, 2, 1, 2, 5, 1, 7, 9, 2, 9, 4, 9, 8, 5, 2, 1, 7, 8, 5, 1, 4, 5, 8, 2, 2, 5, 3, 6, 3, 2, 5, 3, 1, 9, 6, 7, 2, 4, 1, 5, 4, 4, 4, 9, 3, 1, 5, 6, 6, 8, 4, 4, 8, 7, 5, 9, 7, 8, 6, 2, 3, 1, 7, 2, 4, 8, 3, 6, 4, 1, 7, 7, 3, 4, 1, 2]

real    0m21.100s
user    0m1.100s
sys 0m2.896s

Nonetheless, this still leads to memory leakage in cases where I have too much objects (here numbers) and does not prevent this by going into some "batch mode" if memory has to be swapped i.e. the system freezes until the task has finished.

Any hints on how to prevent this?

Cord Kaldemeyer
  • 6,405
  • 8
  • 51
  • 81
  • I think this works because the ThreadPoolExecutor is using a single cpu for processing the different threads that you have created, so python understand that the memory is shared and it is not copied to each thread. Nonetheless, this is still using a single cpu. – fabad Jul 26 '23 at 08:36