1
  1. Manager Code..

    import pandas as pd    
    import multiprocessing    
    import time   
    import MyDF
    import WORKER
    class Manager():   
            'Common base class for all Manager'   
            def __init__(self,Name):   
                print('Hello Manager..')   
                self.MDF=MyDF.MYDF(Name);   
                self.Arg=self.MDF.display();   
                self.WK=WORKER.Worker(self.Arg);    MGR=Manager('event_wise_count')     if __name__ == '__main__':    
            jobs = []   
            x=5;   
            for i in range(5):   
                x=10*i   
                print('Manager : ',i)   
                p = multiprocessing.Process(target=MGR.WK.DISPLAY)   
                jobs.append(p)    
                p.start()    
                time.sleep(x);    
    
  2. worker code...

    import pandas as pd      
    import time     
    class Worker():    
       'Common base class for all Workers'    
        empCount = 0    
        def __init__(self,DF):    
            self.DF=DF;    
            print('Hello worker..',self.DF.count())    
        def DISPLAY(self):      
            self.DF=self.DF.head(10);         
            return self.DF
    

    Hi I am trying to do multiprocessing. and i want to share a Data Frame address with all sub-processes.

    So in above from Manager Class I am spawning 5 process , where each sub-process required to use Data Frame of worker class , expecting that each sub process will share reference of worker Data Frame. But unfortunately It is not happening..

    Any Answer welcome..

    Thanks In Advance,,.. please :)..

  • Hi I am trying to do multiprocessing where i am using object oriented concept. Simple i am trying to use one Data Frame reference by all process. So in above from Manager Class I am calling 5 process and each process using one Data Frame of worker class , which should use reference of that data frame by all process. But unfortunately It is not happening.. – Achyuta nanda sahoo Apr 20 '15 at 12:56
  • Sorry I've tried reformatting your code and it's a mess can you sort it out, plus can you reduce the amount of code, thanks – EdChum Apr 20 '15 at 12:57
  • Hi @EdChum.. I have made the code as 2 list as 1) Manager Code 2) Worker Code.. the code is not comming to text field properly please see by including BOLD and Usual Character.. If you still need clarification.. then i can tell you overally what my requirement.. – Achyuta nanda sahoo Apr 20 '15 at 13:14
  • Forgive me for asking the obvious, but what are you trying to achieve by using a multiprocessor design? – Alexander Apr 20 '15 at 21:38
  • Hi All, I have one doubt please let me know exact Answer.. Let DF be One Data Frame which contain 1 million Records. I want only top 100 record . so i did something like this DF = DF.head(100) . SO here my doubt is : Will DF write to same memory location or different ?????? – Achyuta nanda sahoo Apr 21 '15 at 05:40
  • i need to share a dataframe across mutiple processes. One manager process writes to the dataframe and other worker process reads from it. so if I create the worker objects by passing reference of the dataframe that has been initially created in my Manager , will the workers share the same dataframe or a copy of it ? If the workers are having individual copies it would consume a lot of memory and hence I am concerned about passing it as a reference. – Achyuta nanda sahoo Apr 21 '15 at 05:48

2 Answers2

2

This answer suggests using Namespaces to share large objects between processes by reference.

Here's an example of an application where 4 different processes can read from the same DataFrame. (Note: you can't run this on an interactive console -- save this as a program.py and run it.)

import pandas as pd
from multiprocessing import Manager, Pool


def get_slice(namespace, column, rows):
    '''Return the first `rows` rows from column `column in namespace.data'''
    return namespace.data[column].head(rows)

if __name__ == '__main__':
    # Create a namespace to place our DataFrame in it
    manager = Manager()
    namespace = manager.Namespace()
    namespace.data = pd.DataFrame(pd.np.random.rand(1000, 10))

    # Create 4 processes
    pool = Pool(processes=2)
    for column in namespace.data.columns:
        # Each pool can access the same DataFrame object
        result = pool.apply_async(get_slice, [namespace, column, 5])
        print result._job, column, result.get().tolist()

While reading from the DataFrame is perfectly fine, it gets a little tricky if you want to write back to it. It's better to just stick to immutable objects unless you really need large write-able objects.

Community
  • 1
  • 1
S Anand
  • 11,364
  • 2
  • 28
  • 23
  • i need to share a dataframe across mutiple processes. One manager process writes to the dataframe and other worker process reads from it. so if I create the worker objects by passing reference of the dataframe that has been initially created in my Manager , will the workers share the same dataframe or a copy of it ? If the workers are having individual copies it would consume a lot of memory and hence I am concerned about passing it as a reference. – Achyuta nanda sahoo Apr 21 '15 at 05:55
  • @S Anand ,.. Your Answer will still taking huge memory.. you may be able to access one dataframe by 4 process but all process creating it's own memory area.. which is costly.. please suggest me if i am wrong.. – Achyuta nanda sahoo Apr 21 '15 at 09:33
  • Brother @S Anand .. why are you using get_slice() method in above code ? and if i want simple data frame to be reflect to all process without any changes , what i need ?? – Achyuta nanda sahoo Apr 22 '15 at 07:18
  • @Achyutanandasahoo, I too have had trouble with @SAnand 's approach (using `manager.Namespace()` to share DataFrames between processes), particularly if the df is large (millions of rows). Even if I only have 1 worker child process, doing a simple `df.loc[ix]` can start taking many seconds each time, and the worker does seem to be duplicating the data from the managing process. (In single process profiling, the same operation is really fast) – Scott H May 09 '15 at 10:48
1

Sorry about the necromancy.

The issue is that the workers must have unique DataFrame instances. Almost all attempts to slice, or chunk, a Pandas DataFrame will result in aliases to the original DataFrame. These aliases will still result in resource contention between workers.

There a two things that should improve performance. The first would be to make sure that you are working with Pandas. Iterating row by row, with iloc or iterrows, fights against the design of DataFrames. Using a new-style class object and the apply a method is one option.

def get_example_df():
    return pd.DataFrame(pd.np.random.randint(10, 100, size=(5,5)))

class Math(object):
    def __init__(self):
        self.summation = 0

    def operation(self, row):
        row_result = 0
        for elem in row:
            if elem % 2:
                row_result += elem
            else:
                row_result += 1
        self.summation += row_result
        if row_result % 2:
            return row_result
        else:
            return 1

    def get_summation(self):
        return self.summation

Custom = Math()
df = get_example_df()
df['new_col'] = df.apply(Custom.operation)
print Custom.get_summation()

The second option would be to read in, or generate, each DataFrame for each worker. Then recombine if desired.

workers = 5
df_list = [ get_example_df() ]*workers
...
# worker code
...
aggregated = pd.concat(df_list, axis=0)

However, multiprocessing will not be necessary in most cases. I've processed more than 6 million rows of data without multiprocessing in a reasonable amount of time (on a laptop).

Note: I did not time the above code and there is probably room for improvement.