This is similar to previous questions about how to expand a list-based column across several columns, but the solutions I'm seeing don't seem to work for Dask. Note, that the true DFs I'm working with are too large to hold in memory, so converting to pandas first is not an option.
I have a df with column that contains lists:
df = pd.DataFrame({'a': [np.random.randint(100, size=4) for _ in range(20)]})
dask_df = dd.from_pandas(df, chunksize=10)
dask_df['a'].compute()
0 [52, 38, 59, 78]
1 [79, 71, 13, 63]
2 [15, 81, 79, 76]
3 [53, 4, 94, 62]
4 [91, 34, 26, 92]
5 [96, 1, 69, 27]
6 [84, 91, 96, 68]
7 [93, 56, 45, 40]
8 [54, 1, 96, 76]
9 [27, 11, 79, 7]
10 [27, 60, 78, 23]
11 [56, 61, 88, 68]
12 [81, 10, 79, 65]
13 [34, 49, 30, 3]
14 [32, 46, 53, 62]
15 [20, 46, 87, 31]
16 [89, 9, 11, 4]
17 [26, 46, 19, 27]
18 [79, 44, 45, 56]
19 [22, 18, 31, 90]
Name: a, dtype: object
According to this solution, if this were a pd.DataFrame I could do something like this:
new_dask_df = dask_df['a'].apply(pd.Series)
ValueError: The columns in the computed data do not match the columns in the provided metadata
Extra: [1, 2, 3]
Missing: []
There's another solution listed here:
import dask.array as da
import dask.dataframe as dd
x = da.ones((4, 2), chunks=(2, 2))
df = dd.io.from_dask_array(x, columns=['a', 'b'])
df.compute()
So for dask I tried:
df = dd.io.from_dask_array(dask_df.values)
but that just spits out the same DF I have from before: [1]: https://i.stack.imgur.com/T099A.png
Not really sure why as the types between the example 'x' and the values in my df are the same:
print(type(dask_df.values), type(x))
<class 'dask.array.core.Array'> <class 'dask.array.core.Array'>
print(type(dask_df.values.compute()[0]), type(x.compute()[0]))
<class 'numpy.ndarray'> <class 'numpy.ndarray'>
Edit: I kind of having a working solution but it involves iterating through each groupby object. It feels like there should be a better way:
dask_groups = dask_df.explode('a').reset_index().groupby('index')
final_df = []
for idx in dask_df.index.values.compute():
group = dask_groups.get_group(idx).drop(columns='index').compute()
group_size = list(range(len(group)))
row = group.transpose()
row.columns = group_size
row['index'] = idx
final_df.append(dd.from_pandas(row, chunksize=10))
final_df = dd.concat(final_df).set_index('index')