3

I would I go about creating a new column that is the result of a groupby and apply of another column while keeping the order of the dataframe (or at least be able to sort it back).

example: I want to normalize a signal column by group

import dask
import numpy as np
import pandas as pd
from dask import dataframe

def normalize(x):
    return ((x - x.mean())/x.std())


data = np.vstack([np.arange(2000), np.random.random(2000), np.round(np.linspace(0, 10, 2000))]).T
df = dataframe.from_array(data, columns=['index', 'signal', 'id_group'], chunksize=100)
df = df.set_index('index')

normalized_signal =  df.groupby('id_group').signal.apply(normalize, meta=pd.Series(name='normalized_signal_by_group'))
normalized_signal.compute()

I do get the right series, but the index is shuffled. I do I get this series back in the dataframe?

I tried

df['normalized_signal'] = normalized_signal
df.compute()

but I get

ValueError: Not all divisions are known, can't align partitions. Please use set_index to set the index.


I also tried a merge, but my final dataframe ends up shuffled with no easy way to resort along the index

df2 = df.merge(normalized_signal.to_frame(), left_index=True, right_index=True, how='left')
df2.compute()

It works when I compute the series than sort_index() in pandas but that doesn't seem efficient.

df3 = df.merge(normalized_signal.to_frame().compute().sort_index(), left_index=True, right_index=True, how='left')
df3.compute()

The equivalent pandas way is :

df4 = df.compute()
df4['normalized_signal_by_group'] = df4.groupby('id_group').signal.transform(normalize)
df4
aL_eX
  • 1,453
  • 2
  • 15
  • 30
AlexFC
  • 68
  • 6
  • Would using "delayed" to query grouped chunk of the database, do the math, than use "from delayed" be an efficient solution? – AlexFC Dec 08 '17 at 14:33

1 Answers1

2

Unfortunately transform is not implemented in dask yet. My (ugly) workaround is:

import numpy as np
import pandas as pd
import dask.dataframe as dd

pd.options.mode.chained_assignment = None

def normalize(x):
    return ((x - x.mean())/x.std())

def dask_norm(gp):
    gp["norm_signal"] = normalize(gp["signal"].values)
    return(gp.as_matrix())

data = np.vstack([np.arange(2000), np.random.random(2000), np.round(np.linspace(0, 10, 2000))]).T
df = dd.from_array(data, columns=['index', 'signal', 'id_group'], chunksize=100)
df1 = df.groupby("id_group").apply(dask_norm, meta=pd.Series(name="a") )
df2 = df1.to_frame().compute()
df3 = pd.concat([pd.DataFrame(a) for a in df2.a.values])
df3.columns = ["index", "signal", "id_group", "normalized_signal_by_group"]
df3.sort_values("index", inplace=True)
rpanai
  • 12,515
  • 2
  • 42
  • 64