3

I am just trying to order a dask dataframe by a specific column.

CODE 1 - If I call it it shows as indeed a ddf

my_ddf

OUTPUT 1

npartitions=1   
headers .....

CODE 2

my_ddf.sort_values('id', ascending=False)

OUTPUT 2

AttributeError                            Traceback (most recent call last)
<ipython-input-374-35ce4bd06557> in <module>
----> 1 my_ddf.sort_values('id', ascending=False) #.head(20)
      2 # df.sort_values(columns, ascending=True)

~/anaconda3/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in __getattr__(self, key)
   3619             return self[key]
   3620         else:
-> 3621             raise AttributeError("'DataFrame' object has no attribute %r" % key)
   3622 
   3623     def __dir__(self):

AttributeError: 'DataFrame' object has no attribute 'sort_values'

Tried Solutions

  • This is an example from the official dask documentation df.sort_values(columns, ascending=False).head(n)
  • pandas only - DataFrame object has no attribute 'sort_values'
  • pandas only - 'DataFrame' object has no attribute 'sort'
  • DASK answer - https://stackoverflow.com/a/40378896/10270590
    • I don't want to set it in to index because I want to have only my current index values.
    • The following answer is a bit strange and I am not sure that it would work when I have more partition (currently I have 1 because if previous group by of the data) or how to not to have just a random big number "1000000000". Or how to make it Increasing from top to bottom in the dask dataframe my_ddf.nlargest(1000000000, 'id').compute()
sogu
  • 2,738
  • 5
  • 31
  • 90

3 Answers3

3

AFAIK, sort across partitions is not implemented (yet?). If the dataset is small enough to fit in memory you can do ddf = ddf.compute() and then run sorting on the pandas dataframe.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Currently it is, but only because I am building out the pipeline with a smaller dataset. So it would be crucial to keep everything in DASK when I put x100 load on it it should still function. – sogu Jan 28 '21 at 10:03
  • 1
    I see, when I had a problem like that I had to incorporate additional logic that dask was not aware of, e.g. that certain values could only appear in certain partitions, so a full data shuffle was not needed. I ended up using `delayed` for that task. – SultanOrazbayev Jan 28 '21 at 10:40
2

Dask indexes are not global anyway (by default) If you want to retain the original within-partition index, you can do something like

df["old_index"] = df.reset_index()
df.set_index("colA")
saloni
  • 296
  • 1
  • 7
2

Try setting the index as id and then do sorting through map_partitions like below:

df = df.set_index("id")
df = df.map_partitions(lambda df: df.sort_values(["id"], ascending=False)).reset_index()
Flair
  • 2,609
  • 1
  • 29
  • 41