1

I have a bunch of methods where the only difference is a Stream reduction operation :

static Stream<TimeValues> avg(Stream<TimeValues> data, int batchSize) {
    return data
            .collect(SlidingCollector.batch(batchSize))
            .stream()
            .map(tvl -> {
                    OptionalDouble od = tvl.stream()
                    ...
                        .mapToDouble(tv -> tv.doubleValue())
                        .average(); // <-------
                    ...      
            });
}
static Stream<TimeValues> max(Stream<TimeValues> data, int batchSize) {
    return data
            .collect(SlidingCollector.batch(batchSize))
            .stream()
            .map(tvl -> {
                    OptionalDouble od = tvl.stream()
                    ...
                        .mapToDouble(tv -> tv.doubleValue())
                        .max(); // <-------
                    ...      
            });
}

How to factorize this code and have a parametrized reduction operation (min, max, average, sum) ?

Thanks in advance.

datto
  • 391
  • 2
  • 13
  • Aside -You might want to move `data.collect(SlidingCollector.batch(batchSize))` to a different method as well. With a more slightly hypothetical input, you could have possibly [implemented custom SummaryStatistics](https://stackoverflow.com/a/51378142/1746118) too. – Naman May 02 '20 at 11:24
  • Thanks @Naman - I will give it a look. – datto May 02 '20 at 12:36

2 Answers2

3

Both average() and max() are Function<DoubleStream, OptionalDouble> type methods, so:

static Stream<TimeValues> avg(Stream<TimeValues> data, int batchSize) {
    return reduce(data, batchSize, DoubleStream::average);
}

static Stream<TimeValues> max(Stream<TimeValues> data, int batchSize) {
    return reduce(data, batchSize, DoubleStream::max);
}

private static Stream<TimeValues> reduce(Stream<TimeValues> data, int batchSize,
                                         Function<DoubleStream, OptionalDouble> reducer) {
    return data
            .collect(SlidingCollector.batch(batchSize))
            .stream()
            .map(tvl -> {
                    OptionalDouble od = reducer.apply(tvl.stream()
                    ...
                        .mapToDouble(tv -> tv.doubleValue()));
                    ...      
            });
}
Andreas
  • 154,647
  • 11
  • 152
  • 247
  • Thanks @Andreas - I was trying to use DoubleStream::reduce() but could only make it work with Double::min and Double::max – datto May 02 '20 at 12:41
3

The methods .average and .max are both Function<DoubleStream, OptionalDouble> , you can so store them in this type :

Function<DoubleStream, OptionalDouble> avg = DoubleStream::average; // ie ds -> ds.average()
Function<DoubleStream, OptionalDouble> mx = DoubleStream::max;

The use it as a parameter and use .apply with the DoubleStream as parameter

static Stream<TimeValues> method(Stream<TimeValues> data, int batchSize, 
                                 Function<DoubleStream, OptionalDouble> fct) {
    return data
            .collect(SlidingCollector.batch(batchSize))
            .stream()
            .map(tvl -> {
                OptionalDouble od = fct.apply(tvl.stream().mapToDouble(Number::doubleValue));
            });
}

So the calls will be

Function<DoubleStream, OptionalDouble> avg = ds -> ds.average();
Function<DoubleStream, OptionalDouble> mx = DoubleStream::max;

method(...,10, avg);
method(...,10, mx);
azro
  • 53,056
  • 7
  • 34
  • 70