0

I'm using the multiprocessing python library to run in parallel feature selection for a machine learning problem. This function accepts as input a pandas dataframe and returns some figures.

When I execute this function using mp.pool.map() everything runs smoothly. However, if I substitute it with mp.pool.ThreadPool.map() it fails with this error:

AssertionError: Number of manager items must equal union of block items # manager items: 15, # tot_items: 20.

Strangely, I was running the ThreadPool code normally till yesterday. Then, I tried to re-run it and started getting these errors. I need ThreadPool since this is an IO bound job and it was running much faster compared to pool.

EDIT: The code goes like that (python 2.7):

import multiprocessing as mp
import pandas as pd (version 0.22.0)

def main_functionality(df, params):
    df = df[params['feature']]
    #Run 5-fold cross-validation
        data_df = pd.DataFrame(....)
        pred_df = pred_df.append(data_df)
    return statistics from pred_df

def a_function(df_init, feature, params_init):

    params = dict(params_init)
    df = df_init.copy()

    params['feature'] = feature
    try:
        results = main_functionality(df, params)
    except:
        results = (0,0,0)

    return results

def b_function(df, features):
    pool = mp.pool.ThreadPool(4)
    params = {...}
    results = pool.map(a_function,(df, feature, params) for f in features))

    results_df = pd.DataFrame(results)
    results_df.to_csv(...)

if __name__ == '__main__':
    df = read.csv(...) # A big CSV file (i.e. few GBs)
    features = [i for i in df.columns if i ....]

    b_function(df, features)
Stergios
  • 3,126
  • 6
  • 33
  • 55
  • Do you have an assert somewhere in your code? Can you post some part of it? – Mikhail Burshteyn Sep 07 '18 at 06:47
  • that error message is from pandas not multiprocessing. – georgexsh Sep 07 '18 at 07:52
  • 2
    Your question seems like a duplicate of this: https://stackoverflow.com/questions/35137952/pandas-concat-failing It probably has nothing to do with `ThreadPool` vs multiprocessing but simply you changed data and now pandas is raising that error. – Giacomo Alzetta Sep 07 '18 at 07:54
  • @MikhailBurshteyn Added some pseudocode above. – Stergios Sep 07 '18 at 08:56
  • Change your **pseudocode** to [mcve] – stovfl Sep 07 '18 at 10:34
  • @stovfl Thanks, I wish I knew which lines of code produce the problem so I could provide a minimal example. Unfortunately, I do not. – Stergios Sep 07 '18 at 10:37
  • Are you aware about: **This module is OBSOLETE. Please DO NOT USE IT FOR NEW PROJECTS!** [threadpool 1.3.2](https://pypi.org/project/threadpool/) – stovfl Sep 07 '18 at 11:00
  • @stovfl I'm using that module. I'm using `multiprocessing` library. – Stergios Sep 07 '18 at 11:24
  • Do you mean: "I don't use package threadpool"?. **Edit your Question** and tell which **python version** and prepend your `import` lines to your code. – stovfl Sep 07 '18 at 12:01
  • You're right. My mistake. I'm NOT using `threadpool` package. I updated the pseudocode with pandas version and python version. – Stergios Sep 07 '18 at 13:51
  • code looks a bit like you could modify the same `pred_df` from multiple threads. this might randomly break depending on how threads get scheduled. using processes should be fine as each process will have its own copy of the whole object – Sam Mason Sep 07 '18 at 14:18
  • @SamMason You were right. I had to add the first two lines in `a_function()` so that each thread is now working on its own copy of the params dict and the pandas dataframe. – Stergios Sep 08 '18 at 15:07
  • Everything now working properly. – Stergios Sep 08 '18 at 15:08

0 Answers0