3

This is similar to previous questions about how to expand a list-based column across several columns, but the solutions I'm seeing don't seem to work for Dask. Note, that the true DFs I'm working with are too large to hold in memory, so converting to pandas first is not an option.

I have a df with column that contains lists:

df = pd.DataFrame({'a': [np.random.randint(100, size=4) for _ in range(20)]})
dask_df = dd.from_pandas(df, chunksize=10)
dask_df['a'].compute()

0     [52, 38, 59, 78]
1     [79, 71, 13, 63]
2     [15, 81, 79, 76]
3      [53, 4, 94, 62]
4     [91, 34, 26, 92]
5      [96, 1, 69, 27]
6     [84, 91, 96, 68]
7     [93, 56, 45, 40]
8      [54, 1, 96, 76]
9      [27, 11, 79, 7]
10    [27, 60, 78, 23]
11    [56, 61, 88, 68]
12    [81, 10, 79, 65]
13     [34, 49, 30, 3]
14    [32, 46, 53, 62]
15    [20, 46, 87, 31]
16      [89, 9, 11, 4]
17    [26, 46, 19, 27]
18    [79, 44, 45, 56]
19    [22, 18, 31, 90]
Name: a, dtype: object

According to this solution, if this were a pd.DataFrame I could do something like this:

new_dask_df = dask_df['a'].apply(pd.Series)

ValueError: The columns in the computed data do not match the columns in the provided metadata
Extra:   [1, 2, 3]
Missing: []

There's another solution listed here:

import dask.array as da
import dask.dataframe as dd
x = da.ones((4, 2), chunks=(2, 2))
df = dd.io.from_dask_array(x, columns=['a', 'b'])
df.compute()

So for dask I tried:

df = dd.io.from_dask_array(dask_df.values)

but that just spits out the same DF I have from before: [1]: https://i.stack.imgur.com/T099A.png

Not really sure why as the types between the example 'x' and the values in my df are the same:

print(type(dask_df.values), type(x))
<class 'dask.array.core.Array'> <class 'dask.array.core.Array'>
print(type(dask_df.values.compute()[0]), type(x.compute()[0]))
<class 'numpy.ndarray'> <class 'numpy.ndarray'>

Edit: I kind of having a working solution but it involves iterating through each groupby object. It feels like there should be a better way:

dask_groups = dask_df.explode('a').reset_index().groupby('index')
final_df = []
for idx in dask_df.index.values.compute():
    group = dask_groups.get_group(idx).drop(columns='index').compute()
    group_size = list(range(len(group)))
    row = group.transpose()
    row.columns = group_size
    row['index'] = idx
    final_df.append(dd.from_pandas(row, chunksize=10))
final_df = dd.concat(final_df).set_index('index')
Drivebyluna
  • 344
  • 2
  • 14

3 Answers3

1

In this case dask doesn't know what to expect from the outcome, so it's best to specify meta explicitly:


# this is a short-cut to use the existing pandas df
# in actual code it is sufficient to provide an
# empty series with the expected dtype
meta = df['a'].apply(pd.Series)

new_dask_df = dask_df['a'].apply(pd.Series, meta=meta)
new_dask_df.compute()
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • This works, technically, but it assumes I have access to df. In my real program, df doesn't exist, everything exists as a dask df, so in order to create the meta object, i'd have to compute dask_df to begin with, which defeats the purpose. – Drivebyluna Sep 29 '21 at 16:32
  • Not quite, `meta` doesn't need to contain the data, only the column names and dtypes. – SultanOrazbayev Sep 29 '21 at 16:37
  • 1
    Thanks, this works: columns = list(range(4)) meta = pd.DataFrame(columns=columns) for x in columns: meta[x] = meta[x].astype(int) new_dask_df = dask_df['a'].apply(pd.Series, meta=meta) – Drivebyluna Sep 29 '21 at 17:13
0

I got a working solution. My original function created a list which resulted in the column of lists, as above. Changing the applied function to return a dask bag seems to do the trick:

def create_df_row(x):
    vals = np.random.randint(2, size=4)
    return db.from_sequence([vals], partition_size=2).to_dataframe()

test_df = dd.from_pandas(pd.DataFrame({'a':[random.choice(['a', 'b', 'c']) for _ in range(20)]}), chunksize=10)
test_df.head()

enter image description here

mini_dfs = [*test_df.groupby('a')['a'].apply(lambda x: create_df_row(x))]
result = dd.concat(mini_dfs)
result.compute().head()

But not sure if this solves the in-memory issue as now i'm holding a list of groupby results.

Drivebyluna
  • 344
  • 2
  • 14
0

Here's how to expand a list-like column across multiple columns manually:

dask_df["a0"] = dask_df["a"].str[0]
dask_df["a1"] = dask_df["a"].str[1]
dask_df["a2"] = dask_df["a"].str[2]
dask_df["a3"] = dask_df["a"].str[3]

print(dask_df.head())
                  a  a0  a1  a2  a3
0   [71, 16, 0, 10]  71  16   0  10
1  [59, 65, 99, 74]  59  65  99  74
2  [83, 26, 33, 38]  83  26  33  38
3   [70, 5, 19, 37]  70   5  19  37
4    [0, 59, 4, 80]   0  59   4  80

SultanOrazbayev's answer seems more elegant.

Powers
  • 18,150
  • 10
  • 103
  • 108