2

I would like to store the result of the work in a specific variable after multiprocessing as shown below.

Alternatively, I want to save the results of the job as a csv file. May I know how to do it?

This is my code:

(I want to get 'df4' and 'df7' data and to save csv file)

import pandas as pd
from pandas import DataFrame
import time
import multiprocessing

df2 = pd.DataFrame()
df3 = pd.DataFrame()
df4 = pd.DataFrame()
df5 = pd.DataFrame()
df6 = pd.DataFrame()
df7 = pd.DataFrame()
df8 = pd.DataFrame()

date = '2011-03', '2011-02' ........ '2021-03'    #There are 120 list.
list1 = df1['resion'].drop_duplicates()  # There are 20 list. 'df1' is original data

#I'd like to divide the list and work on it. 
list11 = list1.iloc[0:10]
list12 = list1.iloc[10:20]

#It's a function using 'list11'.
def cal1():
    global df2
    global df3
    global df4

    start = time.time()

    for i, t in enumerate(list11):    
        df2 = pd.DataFrame(df1[df1['resion'] == t])  #'df1' is original data

        if i%2 == 0:
            print ("cal1 function processing: ", i)
            end = time.time()
            print (end-start)

        else:
            pass

        for n, d in enumerate(date):               
            df3 = pd.DataFrame(df2[df2['date'] == d])
            df3['number'] = df3['price'].rank(pct=True, ascending = False )
            df4 = df4.append(pd.DataFrame(df3))

        return df4

#It's a function using 'list12'.

def cal2():
    global df5
    global df6
    global df7

    start = time.time()

    for i, t in enumerate(list12):    
        df5 = pd.DataFrame(df1[df1['resion'] == t])  #'df1' is original data

        if i%2 == 0:
            print ("cal1 function processing: ", i)
            end = time.time()
            print (end-start)

        else:
            pass

        for n, d in enumerate(date):               
            df6 = pd.DataFrame(df5[df5['date'] == d])
            df6['number'] = df6['price'].rank(pct=True, ascending = False )
            df7 = df7.append(pd.DataFrame(df6))

        return df7

## Multiprocessing code

if __name__ == "__main__":
    # creating processes
    p1 = multiprocessing.Process(target=cal1, args=())
    p2 = multiprocessing.Process(target=cal2, args=())
  
    # starting process 1
    p1.start()
    # starting process 2
    p2.start()
  
    # wait until process 1 is finished
    p1.join()
    
    # wait until process 2 is finished
    p2.join()
  
    # both processes finished
    print("Done!")
Tomerikoo
  • 18,379
  • 16
  • 47
  • 61
  • 2
    Does this answer your question? [How can I recover the return value of a function passed to multiprocessing.Process?](https://stackoverflow.com/questions/10415028/how-can-i-recover-the-return-value-of-a-function-passed-to-multiprocessing-proce) – Tomerikoo Mar 25 '21 at 08:18

1 Answers1

1

It looks like your functions cal1 and cal2 are identical except that they are trying to assign results to some different global variables. This is not going to work, because when you run them in a subprocess, they will assign that global variable in the subprocess, but that will have no impact whatsoever on the main process from which you started them.

If you want to map a function to multiple input ranges across multiple processes you can use a process Pool and Pool.map.

For example:

def cal(input_list):
    start = time.time()

    for i, t in enumerate(input_list):    
        df2 = pd.DataFrame(df1[df1['resion'] == t])  #'df1' is original data

        if i%2 == 0:
            print ("cal1 function processing: ", i)
            end = time.time()
            print (end-start)

        else:
            pass

        for n, d in enumerate(date):               
            df3 = pd.DataFrame(df2[df2['date'] == d])
            df3['number'] = df3['price'].rank(pct=True, ascending = False )
            df4 = df4.append(pd.DataFrame(df3))

        # I kept your original code unmodified but I'm not really sure this
        # is what to do, because you are returning after one pass through the
        # outer loop.  I haven't scrutinized what you are actually trying to
        # do but I suspect this is wrong too.
        return df4

Then create a process pool and you can divide up the input how you want (or, with a bit of tweaking, you can let Pool.map chunk the input for you, and then reduce the outputs from map into a single output):

pool = multiprocessing.Pool(2)
dfs = pool.map(cal, [list1.iloc[0:10], list1.iloc[10:20]])

This is just to get you started. I would probably do a number of other things differently as well.

Iguananaut
  • 21,810
  • 5
  • 50
  • 63
  • Thank you for your advice. I tried the code you told me. However, 'for loop' does not work. Only the first 'list' works...... (I mean , list1.iloc[0:1], list1.iloc[10:11]) – Ketoziger log Mar 26 '21 at 10:46
  • Yes, see the comment I added in my copy of your code. The way you coded your loop in `cal1` and `cal2` is buggy because it returns from the loop after one iteration, but that's unrelated to the question about multiprocessing. – Iguananaut Mar 26 '21 at 12:08
  • Thank you for your kind explanation. It was very helpful. – Ketoziger log Mar 30 '21 at 23:49