14

What would be the equivalent of sort_values in pandas for a dask DataFrame ? I am trying to scale some Pandas code which has memory issues to use a dask DataFrame instead.

Would the equivalent be :

ddf.set_index([col1, col2], sorted=True)

?

gerrit
  • 24,025
  • 17
  • 97
  • 170
femibyte
  • 3,317
  • 7
  • 34
  • 59

3 Answers3

12

Sorting in parallel is hard. You have two options in Dask.dataframe

set_index

As now, you can call set_index with a single column index:

In [1]: import pandas as pd

In [2]: import dask.dataframe as dd

In [3]: df = pd.DataFrame({'x': [3, 2, 1], 'y': ['a', 'b', 'c']})

In [4]: ddf = dd.from_pandas(df, npartitions=2)

In [5]: ddf.set_index('x').compute()
Out[5]: 
   y
x   
1  c
2  b
3  a

Unfortunately dask.dataframe does not (as of November 2016) support multi-column indexes

In [6]: ddf.set_index(['x', 'y']).compute()
NotImplementedError: Dask dataframe does not yet support multi-indexes.
You tried to index with this index: ['x', 'y']
Indexes must be single columns only.

nlargest

Given how you phrased your question I suspect that this doesn't apply to you, but often cases that use sorting can get by with the much cheaper solution nlargest.

In [7]: ddf.x.nlargest(2).compute()
Out[7]: 
0    3
1    2
Name: x, dtype: int64

In [8]: ddf.nlargest(2, 'x').compute()
Out[8]: 
   x  y
0  3  a
1  2  b
MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • Thanks Matthew. If I knew the number of rows of the dataframe in advance, then `nlargest` would work right ? – femibyte Nov 02 '16 at 17:49
  • 4
    nlargest returns a dask.dataframe of a single partition, so it's generally not a good way to sort things. – MRocklin Nov 02 '16 at 19:09
  • So, for applying sorting operation on all the partitions and getting an ultimate result. Which way will be a good one? – Koustav Chanda Nov 10 '19 at 14:32
5

My preferred method is to first set_index using a single column in dask and then distribute Pandas' sort_values using map_partitions

# Prepare data
import dask
import dask.dataframe as dd
data = dask.datasets.timeseries()

# Sort by 'name' and 'id'
data = data.set_index('name')
data = data.map_partitions(lambda df: df.sort_values(['name', 'id']))

One possible gotcha would that a single index value must not be in multiple partitions. From what I saw in practice though, Dask does not seem to allow that to happen. Would be good to have a more well-founded opinion on that, though.

edit: I have asked about this in Dask dataframe: Can a single index be in multiple partitions?

Dahn
  • 1,397
  • 1
  • 10
  • 29
1

You would use this code to add a new composite column and set index to it:

newcol = ddf.col1 + "|" + ddf.col2
ddf = ddf.assign(ind=newcol)
ddf = ddf.set_index('ind', sorted=True)

If the dataframe is already sorted by (col1, col2) then it is already sorted by newcol too, so you can use sorted=True.

negas
  • 841
  • 12
  • 10
  • 6
    I don't believe this works — `sorted=True` means that you're promising Dask the index is _already_ sorted, not that you're requesting Dask sort it. See https://github.com/dask/dask/issues/2388 – goodside Aug 31 '19 at 21:58
  • I have fixed the explanation, it was not clear. Thank you. – negas Apr 11 '21 at 12:36