1

How can I use pd.cut() in Dask? Because of the large dataset, I am not able to put the whole dataset into memory before finishing the pd.cut().

Current code that is working in Pandas but needs to be changed to Dask:

import pandas as pd

d = {'name': [1, 5, 1, 10, 5, 1], 'amount': [1, 5, 3, 8, 4, 1]}
df = pd.DataFrame(data=d)

#Groupby name and add column sum (of amounts) and count (number of grouped rows)
df = (df.groupby('name')['amount'].agg(['sum', 'count']).reset_index().sort_values(by='name', ascending=True))
print(df.head(15))

#Groupby bins and chnage sum and count based on grouped rows
df = df.groupby(pd.cut(df['name'], 
           bins=[0,4,8,100], 
           labels=['namebin1', 'namebin2', 'namebin3']))['sum', 'count'].sum().reset_index()
print(df.head(15))

Output:

       name  sum  count
0  namebin1    5      3
1  namebin2    9      2
2  namebin3    8      1

I tried:

import pandas as pd
import dask.dataframe as dd

d = {'name': [1, 5, 1, 10, 5, 1], 'amount': [1, 5, 3, 8, 4, 1]}
df = pd.DataFrame(data=d)
df = dd.from_pandas(df, npartitions=2)

df = df.groupby('name')['amount'].agg(['sum', 'count']).reset_index()
print(df.head(15))

df = df.groupby(df.map_partitions(pd.cut, 
        df['name'],                  
        bins=[0,4,8,100], 
        labels=['namebin1', 'namebin2', 'namebin3']))['sum', 'count'].sum().reset_index()
print(df.head(15))

Gives error: TypeError("cut() got multiple values for argument 'bins'",)

Scripter
  • 558
  • 2
  • 8
  • 20

1 Answers1

2

The reason why you're seeing this error is that pd.cut() is being called with the partition as the first argument which it doesn't expect (see the docs).

You can wrap it in a custom function and call that instead, like so:

import pandas as pd
import dask.dataframe as dd

def custom_cut(partition, bins, labels):
    result = pd.cut(x=partition["name"], bins=bins, labels=labels)
    return result

d = {'name': [1, 5, 1, 10, 5, 1], 'amount': [1, 5, 3, 8, 4, 1]}
df = pd.DataFrame(data=d)
df = dd.from_pandas(df, npartitions=2)

df = df.groupby('name')['amount'].agg(['sum', 'count']).reset_index()

df = df.groupby(df.map_partitions(custom_cut,               
        bins=[0,4,8,100], 
        labels=['namebin1', 'namebin2', 'namebin3']))[['sum', 'count']].sum().reset_index()

df.compute()

name        sum    count
namebin1    5      3
namebin2    9      2
namebin3    8      1
gherka
  • 1,416
  • 10
  • 17
  • Thanks a lot. Works for my small testset, but "Killed" (probably out of memory) for the big one. I didn't use df.compute() but did a print(df.head()) instead and that failed. How can I groupby without having the whole dataset in memory? – Scripter Sep 22 '20 at 18:57
  • I now see that it is already "Killed" at the previous operation: df = (df.groupby('name')['amount'].agg(['sum', 'count']).reset_index()) (which is followed with a print(df.head()) as well). I found out that this print triggers the calculation so I dropped the prints and re-run now. – Scripter Sep 22 '20 at 19:04
  • A general question about out of memory workflows in `pandas` has a good selection of answers [here](https://stackoverflow.com/questions/14262433/large-data-work-flows-using-pandas). – gherka Sep 23 '20 at 08:20
  • From what I understand, the code provided by you doesn't use pandas but dask untill the df.compute(), right? Without any prints in between, it is killed because of memory shortage right there. That seems strange to me, because everything should be on disk until a 3 row, 3 column df is produced? – Scripter Sep 23 '20 at 18:04
  • Not quite. `dask` will build a graph of tasks to do (ideally in parallel) and when you either call `compute()` or an implicit equivalent like `head()` will try to execute it. The reason it's fast up to the `compute()` point is because creating a task is fast...execution will still happen in memory / distributed across workers if you have them. – gherka Sep 23 '20 at 21:11