8

I have three 1D arrays:

  • idxs: the index data
  • weights: the weight of each index in idxs
  • bins: the bin which is used to calculate minimum weight in it.

Here's my current method of using the idxs to check the data called weights in which bin, and then calculate the min/max of binned weights:

illustration

  1. Get slices that shows which bin each idxs element belongs to.
  2. Sort slices and weights at the same time.
  3. Calculate the minimum of weights in each bin (slice).

numpy method

import random
import numpy as np

# create example data
out_size = int(10)
bins = np.arange(3, out_size-3)
idxs = np.arange(0, out_size)
#random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

# get which bin idxs belong to
slices = np.digitize(idxs, bins)

# get index and weights in bins
valid = (bins.max() >= idxs) & (idxs >= bins.min())
valid_slices = slices[valid]
valid_weights = weights[valid]

# sort slice and weights
sort_index = valid_slices.argsort()
valid_slices_sort = valid_slices[sort_index]
valid_weights_sort = valid_weights[sort_index]

# get index of each first unque slices
unique_slices, unique_index = np.unique(valid_slices_sort, return_index=True)
# calculate the minimum
res_sub = np.minimum.reduceat(valid_weights_sort, unique_index)

# save results
res = np.full((out_size), np.nan)
res[unique_slices-1] = res_sub

print(res)

Results:

array([ 3., nan,  5., nan, nan, nan, nan, nan, nan, nan])

If I increase the out_size to 1e7 and shuffle the data, the speed (from np.digitize to the end) is slow:

13.5 s ± 136 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

And, here's the consumed time of each part:

np.digitize: 10.8 s ± 12.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
valid: 171 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
argsort and slice: 2.02 s ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
unique: 9.9 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
np.minimum.reduceat: 5.11 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

np.digitize() costs the most of time: 10.8 s. And, the next is argsort: 2.02 s.

I also check the consumed time of calculating mean using np.histogram:

counts, _ = np.histogram(idxs, bins=out_size, range=(0, out_size))
sums, _ = np.histogram(idxs, bins=out_size, range=(0, out_size), weights = weights, density=False)
mean = sums / np.where(counts == 0, np.nan, counts)

33.2 s ± 3.47 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

This is similar to my method of calculating minimum.

scipy method

from scipy.stats import binned_statistic
statistics, _, _ = binned_statistic(idxs, weights, statistic='min', bins=bins)

print(statistics)

The result is a little different, but the speed is much slower (x6) for the longer(1e7) shuffled data:

array([ 3., nan,  5.])

1min 20s ± 6.93 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

Summary

I want to figure out a quicker method. If the method is also suitable for dask, that would be excellent!

User Case

Here's how my real data (1D) look like: real data

zxdawn
  • 825
  • 1
  • 9
  • 19
  • For your actual use case, do you need to maintain the weights functionality of binned_statistic? (e.g. for computing 'mean') – SultanOrazbayev Jun 08 '21 at 09:18
  • @SultanOrazbayev The weights are just the data. I don't need to multiply it for calculation. Sorry for the misleading name. I just want to get the minimum or maximum or mean of data within each bin. – zxdawn Jun 08 '21 at 09:30
  • OK, it's just min/max are easy to do without `binned_statistic`, but for calculating the mean, the weights matter... I'll think of a good answer. – SultanOrazbayev Jun 08 '21 at 09:34
  • For the mean, I know the quick method: just calculate the sum and count using histogram. Please feel free to post the min/max solution ;) – zxdawn Jun 08 '21 at 09:39
  • 1
    Any assumptions about your data being ordered/sorted? digitization or computing a histogram usualy involves sorting, so you will have a hard time beating O(nlog(n)), unless you can make assumptions on the ordering of your data. – FirefoxMetzger Jun 08 '21 at 14:57
  • Same question, but for the range and data types. Is it integers only like in the example or floats, too? is there an upper and/or lower bound? – FirefoxMetzger Jun 08 '21 at 15:03
  • @FirefoxMetzger The `idx` is `int64` with ascending order with many fill_value. The `weights` is `float64` and no order. See the added `User Case` Section. – zxdawn Jun 09 '21 at 02:40
  • @SultanOrazbayev Hope you don't misunderstand the situation. I use the `idxs` to check the data called `weights` in which bin, and then calculate the min/max of binned `weights`. If you know the faster method, could you teach me how to apply it using `dask`? Thanks! – zxdawn Jun 09 '21 at 02:54
  • This looks like `bins = np.range(0, max_bin)`. Is that intuition coming from your plots correct? – FirefoxMetzger Jun 09 '21 at 07:55
  • @FirefoxMetzger Yes, that's similar to `range=(0, out_size)` used in the `np.histogram()`. – zxdawn Jun 09 '21 at 08:00

2 Answers2

3

A quick approach to achieve this is with dask.dataframe and pd.cut, I first show the pandas version:

import numpy as np
from scipy.stats import binned_statistic as bs
import pandas as pd

nrows=10**7

df = pd.DataFrame(np.random.rand(nrows, 2), columns=['x', 'val'])

bins = np.linspace(df['x'].min(), df['x'].max(), 10)

df['binned_x'] = pd.cut(df['x'], bins=bins, right=False)

result_pandas = df.groupby('binned_x')['val'].min().values
result_scipy = bs(df['x'], df['val'], 'min', bins=bins)[0]

print(np.isclose(result_pandas, result_scipy))
# [ True  True  True  True  True  True  True  True  True]

Now to go from pandas to dask, you will need to make sure that bins are consistent across partitions, so take a look here. Once every partition is binned consistently, you want to apply the desired operation (min/max/sum/count):

import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=10)

def f(df, bins):
    df = df.copy()
    df['binned_x'] = pd.cut(df['x'], bins=bins, right=False)
    result = df.groupby('binned_x', as_index=False)['val'].min()
    return result

result_dask = ddf.map_partitions(f, bins).groupby('binned_x')['val'].min().compute()

print(np.isclose(result_pandas, result_dask))
# [ True  True  True  True  True  True  True  True  True]

On my laptop, the first code take about 7 3 seconds, the second code is about 10 times faster (forgot that I am double-counting both pandas and scipy performing the same operation). There is scope for playing around with partitioning, but that's context-dependent, so something you can try optimizing on your data/hardware.

Update: note that this approach will work for min/max, but for mean you will want to calculate sum and count and then divide them. There is probably a good way of keeping track of weights in doing this calculation in one go, but it might not be worth the added code complexity.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Thanks, this method works well without memory errorr. Could FirefoxMetzger's method be combined with dask? That may increase the speed and save memory at the same time. – zxdawn Jun 09 '21 at 12:07
  • Yes, I am certain it is possible. – SultanOrazbayev Jun 09 '21 at 12:11
  • @XinZhang While re-reading Sultan's answer I realized that he/she has fixed the number of 10 `bins` and scaled `idxs` and `weights` to `1e7`. Are we looking for a solution that keeps the number of bins low, or is the number of bins `1e7` (or somewhere in that magnitude)? – FirefoxMetzger Jun 09 '21 at 14:25
  • @FirefoxMetzger The exact length of each data is shown as the x-axis called dim_0 in the `User Case` at the end of question description. The number of idxs and weights is around 1.5e5, the number of bins is around 3.8e4. – zxdawn Jun 09 '21 at 15:18
3

SultanOrazbayev showed a quick approach; I'll add a cool one.

mask = bins[:, None] == idxs[None, :]
result = np.nanmin(np.where(mask, weights, np.nan), axis=-1)
# Note: may produce (expected) runtime warning if bin has no values

of course, you can also do np.nanmax, np.nanmean, etc.

The above assumes that your bins are indeed single values. If they are ranges, you need slightly little more work to construct the mask

lower_mask = idxs[None, :] >= bins[:, None]
upper_mask = np.empty_like(lower_mask)
upper_mask[:-1, ...] = idxs[None, :] < bins[1:, None]
upper_mask[-1, ...] = False

mask = lower_mask & upper_mask

at which point you can use np.nanmin like above.


Ofc np.where and the broadcast to create a mask will create new arrays of shape (len(bins), len(idxs)) with their respective datatype. If this is of no concern to you, then the above is probably good enough.

If it is a problem (because you are pressed for RAM), then my first suggestion is to buy more RAM. If - for some stupid reason (say, money) - that doesn't work, you can avoid the copy of weights by using a masked array over a manually re-strided view into weights

import numpy.ma as ma

mask = ...

restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=(bins.size, idxs.size), strides=(0, idxs.strides[0]))
masked = ma.masked_array(restrided_weights, mask=~mask, fill_value=np.nan, dtype=np.float64)
result = masked.min(axis=-1).filled(np.nan)

this avoids both, a copy of weights and the above-mentioned runtime warning.

If you don't even have enough memory to construct mask, then you can try processing the data in chunks.

Last I checked, Dask used to have funny behavior when fed with manually strided arrays. There was some work on this though, so you may want to double-check if that is resolved, in which case you can happily parallelize the above.


Update based on your further comments to this answer and the other:

You can do this computation in chunks to avoid memory issues due to your large number of bins (1e4 in magnitude). Putting the concrete numbers into a full example and adding a progress bar indicates <40 seconds runtime on a single core.

import numpy.ma as ma
from tqdm import trange
import numpy as np
import random

# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

chunk_size = 100

# preallocate buffers to avoid array creation in main loop
extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity
mask_buffer = np.empty((chunk_size, len(idxs)), dtype=bool)


result = np.empty_like(bins, dtype=np.float64)

for low in trange(0, len(bins), chunk_size):
    high = min(low + chunk_size, len(bins))
    chunk_size = high - low
    mask_buffer[:chunk_size, ...] = ~((bins[low:high, None] <= idxs[None, :]) & (extended_bins[low+1:high+1, None] > idxs[None, :]))
    mask = mask_buffer[:chunk_size, ...]
    restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=mask.shape, strides=(0, idxs.strides[0]))
    masked = ma.masked_array(restrided_weights, mask=mask, fill_value=np.nan, dtype=np.float64)
    result[low:high] = masked.min(axis=-1).filled(np.nan)

Bonus: For min and max only there is a cool trick that you can use: sort idxs and weights based on weights (ascending for min, descending for max). This way, you can immediately look up the min/max value and can avoid the masked array and the custom strides altogether. This relies on some not so well documented behavior of np.argmax, which takes a fast-pass for boolean arrays and doesn't search the full array.

It only works for these two cases, and you'd have to fall back to the above for more sophisticated things (mean), but for those two it shaves off another ~70% and a run on a single core clocks in at <12 seconds.

# fast min/max
from tqdm import trange
import numpy as np

# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs


order = np.argsort(weights)
weights_sorted = np.empty((len(weights) + 1), dtype=np.float64)
weights_sorted[:-1] = weights[order]
weights_sorted[-1] = np.nan

idxs_sorted = idxs[order]

extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity

# preallocate buffers to avoid array creation in main loop
chunk_size = 1000
mask_buffer = np.empty((chunk_size, len(idxs) + 1), dtype=bool)
mask_buffer[:, -1] = True
result = np.empty_like(bins, dtype=np.float64)

for low in trange(0, len(bins), chunk_size):
    high = min(low + chunk_size, len(bins))
    chunk_size = high - low
    mask_buffer[:chunk_size, :-1] = (bins[low:high, None] <= idxs_sorted[None, :]) & (extended_bins[low+1:high+1, None] > idxs_sorted[None, :])
    mask = mask_buffer[:chunk_size, ...]
    weight_idx = np.argmax(mask, axis=-1)

    result[low:high] = weights_sorted[weight_idx]
FirefoxMetzger
  • 2,880
  • 1
  • 18
  • 32
  • Thanks, this is interesting, makes me want to try this with dask.arrays at some point. – SultanOrazbayev Jun 09 '21 at 09:15
  • @FirefoxMetzger Your method is incredible! I tested it with my example and found a strange problem: As you said, I get this memory error when increasing the length to 1e5. But, increasing it to 1e7, the first method returns one value: `nan`, while the second one still gets the memory error. – zxdawn Jun 09 '21 at 11:48
  • @SultanOrazbayev If I applied this method with dask like this: `bins = da.array(bins)` and `idxs = da.array(idxs)`, I get the similar error like the above. I suppose simply converting into dask array has no improvement at all. Do you have any idea what's the correct way to meet it? – zxdawn Jun 09 '21 at 11:52
  • @XinZhang applying with dask array will be a bit more involved than wrapping everything in `da.array`... – SultanOrazbayev Jun 09 '21 at 12:09
  • @XinZhang I've updated the answer in response to your concern about memory usage. You can, of course, refactor the above approach to use a dask array and multi-threading if you wish; personally, I think < 1 min runtime is acceptable as it is an easy justification for a quick coffee break. Regarding your finding of `np.where` producing a single `nan` as a result I am not sure why this happens, but can reproduce it on my machine. – FirefoxMetzger Jun 09 '21 at 16:33
  • Thanks! That's quite faster without memory error ;) But, sorry, it still sounds difficult for me to apply the dask method ... @SultanOrazbayev, do you have any dask ideas? Appreciate it a lot! – zxdawn Jun 10 '21 at 14:40