You're facing a common tradeoff with distributed workflows. Do you want to spend the time/resource/compute required to determine the exact min/max, which is a pre-requisite for the binning scheme you describe, or is an approximate answer alright? If the latter, how do you design an algorithm which adequately captures the data's min/max while remaining efficient?
We can start with the exact solution, since it's easier to implement. The key is simply to find the min and max first, then digitize the data. Note that this requires computing all values in the column twice. If persisting the data is an option (e.g. you are working with a distributed cluster or can fit the column to be binned in memory), it would help avoid unecessary repetition:
def discretize_exact(
s: dask.dataframe.Series, K: int
) -> dask.dataframe.Series:
"""
Discretize values in dask.dataframe Series into K equal-width bins
Parameters
----------
s : dask.dataframe.Series
Series with values to be binned
K : int
Number of equal-width bins to generate
Returns
-------
binned : dask.dataframe.Series
dask.dataframe.Series with scheduled np.digitize operation
called using map_partitions. The values in ``binned`` will
be in [0, K] giving the index of the K bins in the interval
[vmin, vmax].
"""
# schedule the min/max computation
vmin, vmax = s.min(), s.max()
# compute vmin and vmax together so we only compute once
vmin, vmax = dask.compute(vmin, vmax)
# will create K - 1 equal width bins, with
# the outer ends open, such that the first bin will be
# (-inf, vmin + step) and the last will be [vmax - step, inf)
bins = np.linspace(vmin, vmax, (K + 1))[1:-1]
return s.map_partitions(
np.digitize,
bins=bins,
meta=('binned', 'uint16'),
)
This does (I think) what you're looking for, but does involve computing the min and max first prior to scheduling the binning operation. Using an example frame:
import dask.dataframe, pandas as pd, numpy as np
N = 10000
df = dask.dataframe.from_pandas(
pd.DataFrame({'a': np.random.random(size=N)}),
chunksize=1000,
)
We can use the above function to discretize our data:
In [68]: df['binned_a'] = discretize_exact(df['a'], K=10)
In [69]: df
Out[69]:
Dask DataFrame Structure:
a binned_a
npartitions=10
0 float64 uint16
1000 ... ...
... ... ...
9000 ... ...
9999 ... ...
Dask Name: assign, 40 tasks
In [70]: df.compute()
Out[70]:
a binned_a
0 0.548415 5
1 0.872668 8
2 0.466869 4
3 0.133986 1
4 0.833126 8
... ... ...
9995 0.223438 2
9996 0.575271 5
9997 0.922593 9
9998 0.030127 0
9999 0.204283 2
[10000 rows x 2 columns]
Alternatively, you could try to approximate the bin edges. You could do this a number of ways, including sampling the dataframe to identify the min/max of one or more partitions, or you the user could provide an overly wide-estimate of the range. Note that, depending on your workflow, computing the first partition may still involve computing a large part of the overall graph, or even the entire graph if e.g. the dataframe was reshuffled in a recent step.
def find_minmax_of_first_partition(
s: dask.dataframe.Series
) -> tuple[float, float]:
"""
Find the min and max of the first partition of a dask.dataframe.Series
"""
partition_0_stats = (
s.partitions[0].compute().agg(['min', 'max'])
)
return (
partition_0_stats['min'].item(),
partition_0_stats['max'].item(),
)
You could widen this range if desired, using your intuition about the spread of the values:
vmin_p0, vmax_p0 = find_minmax_of_first_partition(df['a'])
range_p0 = (vmax_p0 - vmin_p0)
mean_p0 = (vmin_p0 + vmax_p0) / 2
# guess that the overall data is within 10x the range of partition 1
min_est, max_est = mean_p0 - 5*range_p0, mean_p0 + 5*range_p0
# now, bin all values using this estimated min, max. Note that
# any data falling outside your estimated min/max value will be
# coded as values 0 or K + 1.
bins = np.linspace(min_est, max_est, (K + 1))
binned = s.map_partitions(
np.digitize,
bins=bins,
meta=('binned', 'uint16'),
)
these bins will be equally spaced, but will not necessarily start/end at the min/max and therefore may either not catch all the data or may have empty bins at the edges. You may need to take a look at how your bin specification performs and iterate based on your data.