I am making a memory-based real-time calculation module of "Big data" using Pandas module of the Python environment.
So response time is the quality of this module and very critical and important.
To process large data set, I split the data and process sub split data in parallel.
In the part of storing the result of sub data, much time spend(21th line).
I think that internally memory deep copy arises or sub data passed are not shared in memory.
If I written the module in C or C++, I will use pointer or reference like below.
"process=Process(target=addNewDerivedColumn, args=[resultList, &sub_dataframe])"
or
"process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
def addNewDerivedColumn(resultList, split_sub_dataframe&):.... "
Is there a good way to avoid memory deep copy or to reduce time spent in multiprocessing? "Not elegant" is fine. I am ready for making my codes dirty. I tried weekref, RawValue, RawArray, Value, Pool but all failed.
The module is being developed in MacOS and finally is going to run in Linux or Unix.
Do not consider Windows OS.
Here comes the code.
The real code is in my office but the structure and logic are the same as the real one.
1 #-*- coding: UTF-8 -*-'
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9
10 split_sub_dataframe['new_column']= np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11
12 print split_sub_dataframe.head()
13
14 '''
15 i think that the hole result of sub-dataframe is copied to resultList, not reference value
16 and in here time spend much
17 compare elapsed time of comment 21th line with the uncommented one
18 In MS Windows, signifiant difference of elapsed time doesn't show up
19 In Linux or Mac OS, the difference is big
20 '''
21 resultList.append(split_sub_dataframe)
22
23
24
25 if __name__ == "__main__":
26
27 # example data generation
28 # the record count of the real data is over 1 billion with about 10 columns.
29 dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30
31
32 print 'start...'
33 start_time = time.time()
34
35 # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36 split_dataframe_list = np.array_split(dataframe, 5)
37
38 # multiprocessing
39 manager = Manager()
40
41 # result list
42 resultList=manager.list()
43 processList=[]
44
45 for sub_dataframe in split_dataframe_list:
46 process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47 processList.append(process)
48
49 for proc in processList:
50 proc.start()
51 for proc in processList:
52 proc.join()
53
54
55 print 'elapsed time : ', np.round(time.time() - start_time,3)