1

I am trying to build multiprocessing in python to reduce computation speed, but it seems like after multiprocessing, the overall speed of computation decreased significantly. I have created 4 different processes and split dataFrame into 4 different dataframe, which will be an input to each processes. After timing each process, it seems like the overhead cost is significant, and was wondering if there is way to reduce these overhead costs.

I am using windows7, python 3.5 and my machine has 8 cores.

def doSomething(args, dataPassed,):

    processing data, and calculating outputs

def parallelize_dataframe(df, nestedApply):
    df_split = np.array_split(df, 4)
    pool = multiprocessing.Pool(4)
    df = pool.map(nestedApply, df_split)
    print ('finished with Simulation')
    time = float((dt.datetime.now() - startTime).total_seconds())

    pool.close()
    pool.join()

def nestedApply(df):

    func2 = partial(doSomething, args=())
    res = df.apply(func2, axis=1)
    res =  [output Tables]
    return res

if __name__ == '__main__':

data = pd.read_sql_query(query, conn)

parallelize_dataframe(data, nestedApply)
Hojin
  • 95
  • 1
  • 7
  • 2
    Can you list how much longer single threading took versus multiprocessing? – Fruitspunchsamurai Nov 22 '16 at 17:16
  • How many CPUs/cores do you have (real ones, not hyperthreads)? That looks like CPU intensive work so splitting into more than the number of cores will just slow things down. Also, how big are the data frames and how expensive is `doSomething`? To get the dataframe to each subprocess it has to be serialized (via pickle) and deserialized so if the frames are big and `doSomething` is cheap you will indeed see most of the time spent on overhead. – Oliver Dain Nov 22 '16 at 18:11
  • @ Fruitspunchsamurai It took 26 minutes to run single thread, while it took 33 minutes to just run mapping function and 71 minutes overall. – Hojin Nov 22 '16 at 18:52
  • @Oliver I have 8 real cores, and yes doSomething function is very CPU intensive and expensive (It is storing all the result in memory until it hits all the data). I ran about 25,000 rows of DataFrame, and each row has about 20 columns. – Hojin Nov 22 '16 at 19:03

1 Answers1

1

I would suggest to use queues instead of providing your DataFrame as chunks. You need a lot of ressources to copy each chunk and it takes quite some time to do so. You could run out of memory if your DataFrame is really big. Using queues you could benefit from fast iterators in pandas. Here is my approach. The overhead reduces with the complexity of your workers. Unfortunately, my workers are far to simple to really show that, but sleep simulates complexity a bit.

import pandas as pd
import multiprocessing as mp
import numpy as np
import time


def worker(in_queue, out_queue):
    for row in iter(in_queue.get, 'STOP'):
        value = (row[1] * row[2] / row[3]) + row[4]
        time.sleep(0.1)
        out_queue.put((row[0], value))

if __name__ == "__main__":
    # fill a DataFrame
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # setup workers
    numProc = 2
    process = [mp.Process(target=worker,
                          args=(in_queue, out_queue)) for x in range(numProc)]

    # run processes
    for p in process:
        p.start()

    # iterator over rows
    it = df.itertuples()

    # fill queue and get data
    # code fills the queue until a new element is available in the output
    # fill blocks if no slot is available in the in_queue
    for i in range(len(df)):
        while out_queue.empty():
            # fill the queue
            try:
                row = next(it)
                in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True)  # row = (index, A, B, C, D) tuple
            except StopIteration:
                break
        row_data = out_queue.get()
        df.loc[row_data[0], "Result"] = row_data[1]

    # signals for processes stop
    for p in process:
        in_queue.put('STOP')

    # wait for processes to finish
    for p in process:
        p.join()

Using numProc = 2 it takes 50sec per loop, with numProc = 4 it is twice as fast.

RaJa
  • 1,471
  • 13
  • 17