For anyone working with Bags (not DataFrames), this function will do the trick:
def cull_empty_partitions(bag):
"""
When bags are created by filtering or grouping from a different bag,
it retains the original bag's partition count, even if a lot of the
partitions become empty.
Those extra partitions add overhead, so it's nice to discard them.
This function drops the empty partitions.
"""
bag = bag.persist()
def get_len(partition):
# If the bag is the result of bag.filter(),
# then each partition is actually a 'filter' object,
# which has no __len__.
# In that case, we must convert it to a list first.
if hasattr(partition, '__len__'):
return len(partition)
return len(list(partition))
partition_lengths = bag.map_partitions(get_len).compute()
# Convert bag partitions into a list of 'delayed' objects
lengths_and_partitions = zip(partition_lengths, bag.to_delayed())
# Drop the ones with empty partitions
partitions = (p for l,p in lengths_and_partitions if l > 0)
# Convert from list of delayed objects back into a Bag.
return dask.bag.from_delayed(partitions)