2

I want to run a function using concurrent in Python. This is the function that I have :

import concurrent.futures
import pandas as pd
import time

def putIndf(file):
    listSel = getline(file)
    datFram = savetoDataFrame(listSel)
    return datFram #datatype : dataframe

def main():
    newData = pd.DataFrame()
    with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
        for i,file in zip(fileList, executor.map(dp.putIndf, fileList)):
            df = newData.append(file, ignore_index=True)
    return df

if __name__ == '__main__':
    main()

I want to join dataframe to be one dataframe newData, but the result is only the last dataframe from that function

elisa
  • 489
  • 5
  • 13
  • 1
    [Never call DataFrame.append or pd.concat inside a for-loop. It leads to quadratic copying.](https://stackoverflow.com/a/36489724/1422451) – Parfait Jun 19 '19 at 14:49
  • [Every time you call append, Pandas returns a copy of the original dataframe plus your new row. This is called quadratic copy.](https://stackoverflow.com/a/37009561/1422451) – Parfait Jun 19 '19 at 14:51

1 Answers1

2

Essentially you are re-assigning df with each iteration and never growing it. What you probably meant (ill-advised) is to initialize an empty df and append iteratively:

df = pd.DataFrame()
...
df = df.append(file, ignore_index=True)

Nonetheless, the preferred method is to build a collection of data frames to be appended all together once outside a loop and avoid growing any complex objects like data frames inside loop.

def main():
    with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
        # LIST COMPREHENSION
        df_list = [file for i,file in zip(fileList, executor.map(dp.putIndf, fileList))]

        # DICTIONARY COMPREHENSION
        # df_dict = {i:file for i,file in zip(fileList, executor.map(dp.putIndf, fileList))}

    df = pd.concat(df_list, ignore_index=True)
    return df

Alternatively due to your pool process, append data frames to a list, still concatenating once outside the loop:

def main():
    df_list = []      # df_dict = {}
    with concurrent.futures.ProcessPoolExecutor(max_workers=30) as executor:
        for i,file in zip(fileList, executor.map(dp.putIndf, fileList)):
            df_list.append(file)
            # df_dict[i] = file

    df = pd.concat(df_list, ignore_index=True)
    return df
Parfait
  • 104,375
  • 17
  • 94
  • 125
  • why I got error `BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.`? – elisa Jun 19 '19 at 16:56
  • Possibly the list/dict comprehensions do not work with the pool process. See extended answer for a `list.append` approach. – Parfait Jun 19 '19 at 17:17