1

I need, in a data analysis python project, to use both classes and multiprocessing features, and I haven't found a good example of it on Google.

My basic idea - which is probably wrong - is to create a class with a high-size variable (it's a pandas dataframe in my case), and then to define a method which computes an operation (a sum in this case).

import multiprocessing
import time

class C:
    def __init__(self):
        self.__data = list(range(0, 10**7))

    def func(self, nums):
        return sum(nums)

    def start_multi(self):
        for n_procs in range(1, 4):
            print()
            time_start = time.clock()
            chunks = [self.__data[(i-1)*len(self.__data)// n_procs: (i)*len(self.__data)// n_procs] for i in range(1, n_procs+1)]
            pool = multiprocessing.Pool(processes=n_procs)
            results = pool.map_async(self.func, chunks )
            results.wait()
            pool.close()
            results = results.get()
            print(sum(results))
            print("n_procs", n_procs, "total time: ", time.clock() - time_start)

print('sum(list(range(0, 10**7)))', sum(list(range(0, 10**7))))
c = C()
c.start_multi()

The code doesn't work properly: I get the following print output

sum(list(range(0, 10**7))) 49999995000000

49999995000000
n_procs 1 total time:  0.45133500000000026

49999995000000
n_procs 2 total time:  0.8055279999999954

49999995000000
n_procs 3 total time:  1.1330870000000033

that is the computation time increases instead of decreasing. So, which is the error in this code?

But I'm also worried by the RAM usage since, when the variable chunks is created, the self.__data RAM usage is doubled. Is it possible, when dealing with multiprocessing code, and more specifically in this code, to avoid this memory waste? (I promise I'll put everything on Spark in the future :) )

user1403546
  • 1,680
  • 4
  • 22
  • 43
  • 1
    You seem to understand precisely, you have to create copies and send them to the different processes. There's no way this will be faster than a normal `sum`. As an aside, is there a particular reason you are using double-underscores for your `__data` attribute? – juanpa.arrivillaga Oct 31 '17 at 15:58
  • 1
    But, this has nothing to do with you using a class. You should read the section in the `multiprocessing` docs about sharing state. It isn't trivial. – juanpa.arrivillaga Oct 31 '17 at 16:00

1 Answers1

1

It looks like there are a few things at play here:

  1. The chunking operation is pretty slow. On my computer the generation of the chunks was taking about 16% of the time for the cases with multiple processes. The single process, non-pool, version doesn't have that overhead.
  2. You are sending a lot of data into your processes. The chunks array is all the raw data for the ranges which needs to get pickled and sent over to the new processes. It would be much easier to, instead of sending all the raw data, just send the start and end indices.
  3. In general, if you put timers in your func you'll see that most of the time is not being spent there. That's why you aren't seeing a speedup. Most of the time is spent on the chunking, pickling, forking, and other overhead.

As an alternative, you should try switching the chunking technique to just compute the start and end numbers and to avoid sending over so much data.

Next, I would recommend doing something a little more computationally hard than computing the sum. For example, you can try counting primes. Here is an example where we use simple prime computing from here and we use a modified chunking technique. Otherwise, tried to keep the code the same.

import multiprocessing
import time
from math import sqrt; from itertools import count, islice

# credit to https://stackoverflow.com/a/27946768
def isPrime(n):
    return n > 1 and all(n%i for i in islice(count(2), int(sqrt(n)-1)))

limit = 6
class C:
    def __init__(self):
        pass

    def func(self, start_end_tuple):
        start, end = start_end_tuple
        primes = []
        for x in range(start, end):
            if isPrime(x):
                primes.append(x)
        return len(primes)

    def get_chunks(self, total_size, n_procs):
        # start and end value tuples
        chunks = []

        # Example: (10, 5) -> (2, 0) so 2 numbers per process
        # (10, 3) -> (3, 1) or here the first process does 4 and the others do 3
        quotient, remainder = divmod(total_size, n_procs)
        current_start = 0
        for i in range(0, n_procs):
            my_amount = quotient
            if i == 0:
                # somebody needs to do extra
                my_amount += remainder
            chunks.append((current_start, current_start + my_amount))
            current_start += my_amount
        return chunks

    def start_multi(self):
        for n_procs in range(1, 4):
            time_start = time.clock()
            # chunk the start and end indices instead
            chunks = self.get_chunks(10**limit, n_procs)
            pool = multiprocessing.Pool(processes=n_procs)
            results = pool.map_async(self.func, chunks)
            results.wait()
            results = results.get()
            print(sum(results))
            time_delta = time.clock() - time_start
            print("n_procs {} time {}".format(n_procs, time_delta))

c = C()
time_start = time.clock()
print("serial func(...) = {}".format(c.func((1, 10**limit))))
print("total time {}".format(time.clock() - time_start))
c.start_multi()

This should result in a speedup for the multiple processes. Assuming you have the cores for it.

Paul
  • 5,473
  • 1
  • 30
  • 37