1

We have a dataset which has approx 1.5MM rows. I would like to process that in parallel. The main function of that code is to lookup master information and enrich the 1.5MM rows. The master is a two column dataset with roughly 25000 rows. However i am unable to make the multi-process work and test its scalability properly. Can some one please help. The cut-down version of the code is as follows

import pandas
from multiprocessing import Pool

def work(data):
    mylist =[]
    #Business Logic
    return mylist.append(data)

if __name__ == '__main__':
    data_df = pandas.read_csv('D:\\retail\\customer_sales_parallel.csv',header='infer')
    print('Source Data :', data_df)
    agents = 2
    chunksize = 2
    with Pool(processes=agents) as pool:
            result = pool.map(func=work, iterable= data_df, chunksize=20)
            pool.close()
            pool.join()
    print('Result :', result)

Method work will have the business logic and i would like to pass partitioned data_df into work to enable parallel processing. The sample data is as follows

CUSTOMER_ID,PRODUCT_ID,SALE_QTY
641996,115089,2
1078894,78144,1
1078894,121664,1
1078894,26467,1
457347,59359,2
1006860,36329,2
1006860,65237,2
1006860,121189,2
825486,78151,2
825486,78151,2
123445,115089,4

Ideally i would like to process 6 rows in each partition.

Please help.

Thanks and Regards

Bala

Balaji Krishnan
  • 437
  • 8
  • 27
  • Why are you passing a dataframe to multiprocessing that involves appending to lists? You haven't given enough info to answer this properly, but you'll probably either `join` or `merge` the data sources. It'll be faster than multiprocessing too. – roganjosh Oct 11 '17 at 19:13

2 Answers2

2

First, work is returning the output of mylist.append(data), which is None. I assume (and if not, I suggest) you want to return a processed Dataframe.

To distribute the load, you could use numpy.array_split to split the large Dataframe into a list of 6-row Dataframes, which are then processed by work.

import pandas
import math
import numpy as np
from multiprocessing import Pool

def work(data):
    #Business Logic
    return data # Return it as a Dataframe

if __name__ == '__main__':
    data_df = pandas.read_csv('D:\\retail\\customer_sales_parallel.csv',header='infer')
    print('Source Data :', data_df)
    agents = 2
    rows_per_workload = 6
    num_loads = math.ceil(data_df.shape[0]/float(rows_per_workload))
    split_df = np.array_split(data_df, num_loads) # A list of Dataframes
    with Pool(processes=agents) as pool:
        result = pool.map(func=work, iterable=split_df)
        result = pandas.concat(result) # Stitch them back together    
        pool.close()
        pool.join()pool = Pool(processes=agents)
    print('Result :', result)
Brenden Petersen
  • 1,993
  • 1
  • 9
  • 10
0

My best recommendation is for you to use the chunksize parameter in read_csv (Docs) and iterate over. This way you wont crash your ram trying to load everything plus if you want you can for example use threads to speed up the process.

for i,chunk in enumerate(pd.read_csv('bigfile.csv', chunksize=500000)):

Im not sure if this answer your specific question but i hope it helps.

Marco
  • 1,112
  • 1
  • 14
  • 34
  • 1
    Unfortunately I doubt using Python threads will speed up the process due to the GIL. Multiprocess is the only "real" way to use parallelism in Python. – TypeKazt Oct 11 '17 at 19:09
  • @marco Typekazt Thanks for your response. I would like to Multi-process and not multi-thread please – Balaji Krishnan Oct 11 '17 at 19:37
  • @BalajiKrishnan The you have this docs: https://docs.python.org/3.6/library/multiprocessing.html wich i didnt know about but for what i read its really similar to threading (on the outside at least) – Marco Oct 12 '17 at 12:15