0

Here is the code I am using:

import pandas as pd
import sys, multiprocessing

train_data_file = '/home/simon/ali_bigdata/train_data_user_2.0.csv'
user_list_file = '/home/simon/ali_bigdata/user_list.txt'



def feature_extract(list_file, feature_extract_func):
    tmp_list = [line.strip() for line in open(list_file)]

    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    results_list = pool.map(feature_extract_func, tmp_list)

    for tmp in results_list:
        for i in tmp:
            print i,"\t",
        print "\n"

    pool.close()
    pool.join()

def user_feature(tmp_user_id):
    sys.stderr.write("process user " + tmp_user_id + " ...\n")
    try:
        tmp_user_df = df_user.loc[int(tmp_user_id)]
    except KeyError:
        return [tmp_user_id, 0, 0, 0.0]
    else:
        if type(tmp_user_df) == pd.core.series.Series:
            tmp_user_click = 1
        else:
            (tmp_user_click, suck) = tmp_user_df.shape

        tmp_user_buy_df = tmp_user_df.loc[tmp_user_df['behavior_type'] == 4]
        if type(tmp_user_buy_df) == pd.core.frame.DataFrame:
            tmp_user_buy = 1
        else:
            (tmp_user_buy, suck) = tmp_user_buy_df.shape


        return [tmp_user_id, tmp_user_click, tmp_user_buy, 0.0 if tmp_user_click == 0 else float(tmp_user_buy)/tmp_user_click]


df = pd.read_csv(train_data_file, header=0)
df_user = df.set_index(['user_id'])
feature_extract(user_list_file, user_feature)

The error I'm getting is:

process user 102761946 ...
process user 110858443 ...
process user 131681429 ...
Traceback (most recent call last):
  File "extract_feature_2.0.py", line 53, in <module>
    feature_extract(user_list_file, user_feature)
  File "extract_feature_2.0.py", line 13, in feature_extract
    results_list = pool.map(feature_extract_func, tmp_list)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
KeyError: 'the label [False] is not in the [index]'

it occurs when the program runs for a while.

So what does this error mean and how can I multiprocess this map function?

here are the input data format

user_id,item_id,behavior_type,user_geohash,item_category,date,time
99512554,37320317,3,94gn6nd,9232,2014-11-26,20
9909811,266982489,1,,3475,2014-12-02,23
98692568,27121464,1,94h63np,5201,2014-11-19,13
simon_xia
  • 2,394
  • 1
  • 20
  • 32

2 Answers2

0

It is hard to debug errors inside functions used in multiprocessing. You should turn off multiprocessing to debug it, then turn it back on when it is fixed. I usually have a mp=True argument in my function that runs the function in multiprocessing mode by default but can be set to False to run it with a regular, non-multiprocessing map (using an if test) so I can debug these sorts of errors.

So you could set up your function like this, and run it with an mp=False argument to debug it:

def feature_extract(list_file, feature_extract_func, mp=True):
    tmp_list = [line.strip() for line in open(list_file)]

    if mp:
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        results_list = pool.map(feature_extract_func, tmp_list)
    else:
        results_list = map(feature_extract_func, tmp_list)

    for tmp in results_list:
        for i in tmp:
            print i,"\t",
        print "\n"

    if mp:
        pool.close()
        pool.join()

Also, Pool automatically uses the number of available cpus by default, so you don't need to set the number of processes unless you want something different from that.

Also, it is more memory efficient to use a generator expression instead of a list comprehension in this case (although you can more easily slice a list comprehension, so for the debugging you might want to use a list comprehension to jump ahead to the index causing the problem):

So, once the debugging is done, replace:

tmp_list = [line.strip() for line in open(list_file)]

with:

tmp_list = (line.strip() for line in open(list_file))
TheBlackCat
  • 9,791
  • 3
  • 24
  • 31
  • With the help of your debug method, I finally fixed it ! thanks : ) – simon_xia Apr 07 '15 at 07:04
  • @simon_xia: what was the problem in the end? – mhawke Apr 07 '15 at 07:37
  • @mhawke it is caused by the return value of `df_user.loc[int(tmp_user_id)]`, it maybe a series when there is only one line which meet the conditions. So the statement `tmp_user_buy_df = tmp_user_df.loc[tmp_user_df['behavior_type'] == 4]` will break down – simon_xia Apr 11 '15 at 01:11
0

You haven't shown any of the data that is in play when the error occurs. Please post representative data that triggers the problem in your question - it will be much easier to help you if your problem can be reproduced.

I think that the error is happening at this line:

tmp_user_buy_df = tmp_user_df.loc[tmp_user_df['behavior_type'] == 4]

tmp_user_df['behavior_type'] == 4 returns a boolean - True or False - which is then used as a label. Because the label False is not a label in the data frame/series KeyError: 'the label [False] is not in the [index]' is raised. I am puzzled as to why the True case apparently works, but then we haven't seen your data, so there might be an explanation there.

You might have meant to pass a boolean array as the selector; if so wrap the behaviour type lookup in a list, e.g:

tmp_user_buy_df = tmp_user_df.loc[[tmp_user_df['behavior_type'] == 4]]

Also, isinstance() is preferred over type(x) == X, see this comprehensive explanation, You can change the lines

if type(tmp_user_df) == pd.core.series.Series:

to

if isinstance(tmp_user_df, pd.core.series.Series):

and

if type(tmp_user_buy_df) == pd.core.frame.DataFrame:

to

if isinstance(tmp_user_buy_df, pd.core.frame.DataFrame):
Community
  • 1
  • 1
mhawke
  • 84,695
  • 9
  • 117
  • 138
  • sorry for a bit late, i pasted the data format just now. And what i wanna do is select the behavior_type 4 of the same user – simon_xia Apr 07 '15 at 01:18
  • Thanks for your advice of using `isinstance()`, it work well in other cases, but can not work in this case, it is strange somehow – simon_xia Apr 11 '15 at 01:22
  • The `isinstance()` examples above are equivalent to using `type` for the objects used. – mhawke Apr 11 '15 at 02:08