10

When loading data from CSV some CSVs cannot be loaded, resulting in an empty partition. I would like to remove all empty partitions, as some methods seem to not work well with empty partitions. I have tried to repartition, where (for example) repartition(npartitions=10) works, but a value greater than this can still result in empty partitions.

What's the best way of achieving this? Thanks.

morganics
  • 1,209
  • 13
  • 27
  • Can I ask you to raise issues when you encounter a function that does not work well with empty partitions? – MRocklin Dec 14 '17 at 15:05

4 Answers4

12

I've found that filtering a Dask dataframe, e.g., by date, often results in empty partitions. If you're having trouble using a dataframe with empty partitions, here's a function, based on MRocklin's guidance, to cull them:

def cull_empty_partitions(df):
    ll = list(df.map_partitions(len).compute())
    df_delayed = df.to_delayed()
    df_delayed_new = list()
    pempty = None
    for ix, n in enumerate(ll):
        if 0 == n:
            pempty = df.get_partition(ix)
        else:
            df_delayed_new.append(df_delayed[ix])
    if pempty is not None:
        df = dd.from_delayed(df_delayed_new, meta=pempty)
    return df
tpegbert
  • 176
  • 1
  • 6
3

For anyone working with Bags (not DataFrames), this function will do the trick:

def cull_empty_partitions(bag):
    """
    When bags are created by filtering or grouping from a different bag,
    it retains the original bag's partition count, even if a lot of the
    partitions become empty.
    Those extra partitions add overhead, so it's nice to discard them.
    This function drops the empty partitions.
    """
    bag = bag.persist()
    def get_len(partition):
        # If the bag is the result of bag.filter(),
        # then each partition is actually a 'filter' object,
        # which has no __len__.
        # In that case, we must convert it to a list first.
        if hasattr(partition, '__len__'):
            return len(partition)
        return len(list(partition))
    partition_lengths = bag.map_partitions(get_len).compute()

    # Convert bag partitions into a list of 'delayed' objects
    lengths_and_partitions = zip(partition_lengths, bag.to_delayed())

    # Drop the ones with empty partitions
    partitions = (p for l,p in lengths_and_partitions if l > 0)

    # Convert from list of delayed objects back into a Bag.
    return dask.bag.from_delayed(partitions)
Stuart Berg
  • 17,026
  • 12
  • 67
  • 99
1

There is no simple API to do this. You could call df.map_partitions(len) to determine which partitions are empty and then remove them explicitly, perhaps by using df.to_delayed() and dask.dataframe.from_delayed(...).

In the future if you are willing to raise issues when you find a function that does not work well with empty partitions that would be greatly appreciated. https://github.com/dask/dask/issues/new

MRocklin
  • 55,641
  • 23
  • 163
  • 235
1

Here's my attempt at removing empty partitions:

import numpy as np

def remove_empty_partitions(ddf):
    """ remove empty partitions """
    partition_lens = ddf.map_partitions(len).compute()
    ids_of_empty_partitions = np.where(partition_lens==0)
    if len(ids_of_empty_partitions) == len(partition_lens):
        # all partitions are empty
        ddf_nonzero = ddf.partitions[0]
    elif len(ids_of_empty_partitions)>0:
        ddf_nonzero = dd.concat([
            ddf.get_partition(num_partition) for num_partition, partition in enumerate(ddf.partitions)
            if num_partition not in ids_of_empty_partitions
        ])
    return ddf_nonzero

FWIW, the answer by @tpegbert appears to be more efficient in terms of the number of tasks needed to get the filtered dataframe.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46