0

I'm learning dask, and getting this error here and there:

InvalidIndexError: Reindexing only valid with uniquely valued Index objects

There is a preprocessed dask df and I did and can do a lot of operations with it, but a few simple ones throw this error.

I tried resetting index, but it didn't seem, to help.

Question: is there a high lvl answer to what might be wrong? Anywhere I can read about it? Why does it usually happens? Googling didn't help.

For example here is a weird example: I test df stats after each operation to maybe see anything suspicious.

df = load_data()
df.shape[0].compute(), df.npartitions
#ok

df = prepare_target(df)
df.shape[0].compute(), df.npartitions
#ok

x_train, x_test, y_train, y_test = dask_tts(df.drop('target', 1), df['target'], random_state=1)
#ok

x_train['target'] = y_train
x_test['target'] = y_test
#ok

x_train.shape[0].compute(), x_train.npartitions
x_test.shape[0].compute(), x_test.npartitions
#ok

x_train.index.nunique().compute()
x_test.index.nunique().compute()
#ok

train, smooth_dict = smoothed_likelyhood(x_train) # returns df and dict
train.shape[0].compute()
#ok

test, _ = smoothed_likelyhood(x_test)
test.shape[0].compute()
#ok

train.index.nunique().compute()
#ok
test.index.nunique().compute() # after this line - error
# InvalidIndexError: Reindexing only valid with uniquely valued Index objects

to point out, only test throws an error

Here I tried to reproduce it, but it works as it should:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split

def smoothed_likelyhood(df, alpha=1): # works with dask df
    global_mean = df['target'].mean()
    smooth_dict = {}

    final_df = df.copy()

    for c in [c for c in df.columns if c!='target']:
        n_rows = df[c].value_counts()
        all_means = df.groupby(by=c)['target'].mean()
        temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha))    
        final_df[c] = df[c].map(temp_result)
        smooth_dict[c] = temp_result.compute().to_dict()
        
    return final_df, smooth_dict

# TOY EXAMPLE
test = pd.DataFrame({'a':['mos', 'mos', 'london', 'dubai', 'ny', 'mos', 'london', 'dubai', 'shanghai', 'dubai', 'mos', 'london', 'dubai', 'dubai'],
                     'b':['big', 'big', 'big', 'med', 'med', 'med', 'small', 'small', 'small', 'small', 'big', 'med', 'small', 'med'],
                     'target':[1,0,0,1,0,1,1,0,1,1,1,0,0,0]}) 

df = dd.from_pandas(test, npartitions=2)
# -----------------------------------------------

print(f'npartitions: {df.npartitions}')
x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], test_size=0.3, shuffle=True)

x_train['target'] = y_train
x_test['target'] = y_test

print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())

train, smooth_dict = smoothed_likelyhood(x_train)
test, _ = smoothed_likelyhood(x_test)

print(train.shape[0].compute(), train.index.nunique().compute())
print(test.shape[0].compute(), test.index.nunique().compute())

# train.compute()

print(train['target'].mean().compute())
print(test['target'].mean().compute())

This part worked but when I try doing the same with real data:

%%time
df = load_data(stage='prep_2', frac=config.FRAC, verbose=False) # loading many parquet files
df = prepare_target(df) # some small changes to df
# -----------------------------------------------

print(f'npartitions: {df.npartitions}')
x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], random_state=1)

x_train['target'] = y_train
x_test['target'] = y_test

print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())

train, smooth_dict = smoothed_likelyhood(x_train)
test, _ = smoothed_likelyhood(x_test)

print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())

These are the results of prints:

npartitions: 10
1476758 164300
164018 106750
1476758 164300
164018 106750

and any of these 2 will throw the index error mentioned above:

train['target'].mean().compute()
test['target'].mean().compute()

If you have any ideas, I might have a look further to investigate, only if I knew where to look.

Thanks.

Oleg Peregudov
  • 137
  • 2
  • 8
  • it seems like you have index values which are duplicates. Could you post your data here? You might want to try somethign proposed here: https://stackoverflow.com/a/64749574/13592469 : Dask does not have the get index method as far as i know, but you could probably use test.index.compute() to get it as pd index. If you post example code so I can test it, I can give you a more specific answer – n4321d Jun 23 '21 at 17:47
  • Hi. Please have a look, I edited the question. ps. test.index.compute() throws same index error – Oleg Peregudov Jun 23 '21 at 20:02
  • Thank you for updating, I still cannot run your code because I dont have all the funtions you use (NameError: name 'train_test_split' is not defined). But I will try to help you as good as I can. The error clearly indicates that there are duplicates in the index. What is the output of train['target'].head() with your real data? Or if you run: train['target'] = train['target'].reset_index() ?? – n4321d Jun 24 '21 at 00:30
  • Thanks for sticking around. - train['target'].head() - returns as normal, 0 and 1 - train['target'] = train['target'].reset_index() - returns error: - TypeError: Column assignment doesn't support type dask.dataframe.core.DataFrame I also added a few imports for you to run. TOY example should be ok now. – Oleg Peregudov Jun 24 '21 at 08:01

1 Answers1

0

The problem was in smoothed_likelyhood function at:

final_df[c] = df[c].map(temp_result)
smooth_dict[c] = temp_result.compute().to_dict()

something to do with dask series - temp_result

Usually, with pandas, if I want to replace some values (map them) with some other values in df, I would go like this:

dict = {'a':{'a1':1, 'a2':2...}, 'b':{'b1':1, 'b2':2...}}
for c in df.columns: # df.columns = ['a', 'b', ...]
    df[c] = df[c].map(dict[c].get) # swap it inplace

it is super fast

but here, in dask, I couldn't turn temp_result to dict (only after computing). And I tried a few things and, surprisingly, simple df.map(series) worked. As in mapping series not dict. Ok, but that introduced super sneaky problem with invalid index.

The solution was to use calculated dict (smooth_dict) as with pandas, instead of series (temp_result) and map with it. I think it is suboptimal to calculate something too soon in dask, but I need the results anyway and afaik it is only calculated once here when I call it.

Here is a working function:

def smoothed_likelyhood(df, alpha=1):
    """
    Discription:
        preprocess based on mean values of positive events in each category
    Args:
        df: [df] - df to encode
        alpha: [int/float] - regularization param. We can find it with CV
    Returns:
        encoded df, dict to encode user during prediction
    """
    
    global_mean = df['target'].mean()
    smooth_dict = {}

    for c in [c for c in df.columns if c!='target']:
        n_rows = df[c].value_counts()
        all_means = df.groupby(by=c)['target'].mean()
        temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha))
        smooth_dict[c] = temp_result.compute().to_dict()
        df[c] = df[c].map(smooth_dict[c])
        
    return df, smooth_dict

Still, it takes 25min to chew through 800k * 90 dataframe, which is only 5% of my data. If anyone can suggest how to speed this func in dask, it would be much appreciated.

Oleg Peregudov
  • 137
  • 2
  • 8