2

Having the following class:

public class TimeInterval {
    private ZonedDateTime time;
    private Double value1;
    private Double value2;
}

where the TimeInterval.times are intervals in the day what I want to do is group by day and aggregate the values. The tricky part is that I want to apply two different types of aggregations based on a certain condition.

For example:

2018-01-01T00:00, 1.0, 2.0
2018-01-01T08:00, null, null
2018-01-01T16:00, 5.0, 6.0
2018-01-02T00:00, 1.0, 2.0
2018-01-02T08:00, 3.0, 4.0
2018-01-02T16:00, 5.0, 6.0
2018-01-03T00:00, null, null
2018-01-03T08:00, null, null
2018-01-03T16:00, null, null

Should be aggregated to:

2018-01-01, 32.0 - nulls are replaced with 0.0 in this case
2018-01-02, 44.0 - all values valid
2018-01-03, null - all intervals with null values, final value is null

The aggregation function is value1 * value2 but the point is that I want the nulls to be replaced with 0.0 in the case when only part of the intervals are with null values (2018-01-01 from the above example) but if all the intervals are with null values I want the final value for the day to be null (2018-01-03 from the above example).

How do I do that with Java streams?

Mustafa Özçetin
  • 1,893
  • 1
  • 14
  • 16
  • If value1 is null then value2 will be null as well always and yes the function is sum(value1 * value2). My biggest problem is that I need to return null in the special case of all the intervals in the day being null. – Jordan Jordanovski Jul 07 '23 at 12:49

3 Answers3

4

You can use

Map<LocalDate, Double> map = list.stream()
    .collect(groupingBy(ti -> ti.time.toLocalDate(),
        collectingAndThen(
            filtering(ti -> ti.value1 != null,
                mapping(ti -> ti.value1 * ti.value2, reducing(Double::sum))),
        o -> o.orElse(null))
    ));

Normally, you would use the toMap collector for a Reduction like this, but toMap does not allow functions to return null. Further, conditional expressions mixing numeric expressions and null are hard to understand and can easily lead to exceptions.

So this solution filters null values first, uses the reducing collector which produces an Optional which will be empty when all values were null, so we can substitute the empty optional with the desired null result at a final step using orElse(null).

Demo on onecompiler.com

Holger
  • 285,553
  • 42
  • 434
  • 765
2

Well, while I was writing my nice answer, @orhtej2 was faster :)

Anyway, I hope that for the sake of readability, you will have some benefit from my solution, too.

I prefer implementing a few methods to the classes, so the Collector is then very simple and straightforward:

final Map<LocalDate, DaySum> result = list.stream()
    .collect(Collectors.groupingBy(
        TimeInterval::getDatePart,   // how to group the elements
        Collector.of(
            DaySum::new,             // how to create a new accumulator
            DaySum::addTimeInterval, // how to add a single element
            DaySum::combine          // how to combine two accumulators  - used only when the stream is computed concurrently
        ))
    );

Here is the full code, then:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Value;
import lombok.With;
import org.junit.jupiter.api.Test;

import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class AggregationTest {
    @Test
    void aggregate() {
        final List<TimeInterval> list = List.of(
            new TimeInterval(ZonedDateTime.of(2018, 1, 1,  0, 0, 0, 0, ZoneId.systemDefault()), 1.0, 2.0),
            new TimeInterval(ZonedDateTime.of(2018, 1, 1,  8, 0, 0, 0, ZoneId.systemDefault()), null, null),
            new TimeInterval(ZonedDateTime.of(2018, 1, 1, 16, 0, 0, 0, ZoneId.systemDefault()), 5.0, 6.0),
            new TimeInterval(ZonedDateTime.of(2018, 1, 2,  0, 0, 0, 0, ZoneId.systemDefault()), 1.0, 2.0),
            new TimeInterval(ZonedDateTime.of(2018, 1, 2,  8, 0, 0, 0, ZoneId.systemDefault()), 3.0, 4.0),
            new TimeInterval(ZonedDateTime.of(2018, 1, 2, 16, 0, 0, 0, ZoneId.systemDefault()), 5.0, 6.0),
            new TimeInterval(ZonedDateTime.of(2018, 1, 3,  0, 0, 0, 0, ZoneId.systemDefault()), null, null),
            new TimeInterval(ZonedDateTime.of(2018, 1, 3,  8, 0, 0, 0, ZoneId.systemDefault()), null, null),
            new TimeInterval(ZonedDateTime.of(2018, 1, 3, 16, 0, 0, 0, ZoneId.systemDefault()), null, null)
        );
        final Map<LocalDate, DaySum> expected = Map.of(
            LocalDate.of(2018, 1, 1), new DaySum(32.0),
            LocalDate.of(2018, 1, 2), new DaySum(44.0),
            LocalDate.of(2018, 1, 3), new DaySum(null)
        );

        final Map<LocalDate, DaySum> result = list.stream()
            .collect(Collectors.groupingBy(
                TimeInterval::getDatePart,
                Collector.of(
                    DaySum::new,
                    DaySum::addTimeInterval,
                    DaySum::combine
                ))
            );

        assertEquals(expected, result);

        // Visualisation:
        System.out.println("===== INPUT =====");
        list.forEach(System.out::println);
        System.out.println("===== OUTPUT =====");
        System.out.println("result = " + result);
    }
}

@Value
class TimeInterval {
    ZonedDateTime time;
    Double value1;
    Double value2;

    public LocalDate getDatePart() {
        return time.toLocalDate();
    }
}

/**
 * DaySum is made as a  read-and-write object 
 * so it may be easily used in the Collector calculations.
 * If you need an immutable object, create one more class, and 
 * add an extra 4th parameter to the Collector.of() to convert it.
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
class DaySum {
    @With
    Double sum;

    /**
     * The crucial method, responsible for the logic you requested.
     */
    public void addTimeInterval(TimeInterval timeInterval) {
        if (timeInterval.getValue1() == null || timeInterval.getValue2() == null) {
            return;
        }
        final double product = timeInterval.getValue1() * timeInterval.getValue2();
        if (this.sum == null) {
            this.sum = product;
        }
        else {
            this.sum += product;
        }
    }

    /**
     * A Collector requires such a method for combining interim results
     * in case of parallel processing.
     * The idea is that for the same group, the DaySum may be
     * calculated concurrently into multiple objects.
     * The Collector uses this method to combine them into one 
     * at the end of the computation.
     */
    public DaySum combine(DaySum another) {
        if (another == null || another.sum == null) {
            return this;
        }
        if (this.sum == null) {
            return another;
        }
        return this.withSum(this.sum + another.sum);
    }
}

N.B.

If you are sure that you will always execute your stream sequentially only, you might start questioning if you really need to implement the combiner. There is a great question Can a Collector's combiner function ever be used on sequential streams? with great answers dedicated to this topic, it's worth reading before you decide to implement your combiner as a no-op function :)

Honza Zidek
  • 9,204
  • 4
  • 72
  • 118
  • Nice, I like moving aggregation to `DaySum` class from readability standpoint. – orhtej2 Jul 07 '23 at 14:36
  • Is there a way to avoid the combiner since the stream is sequential and I get sonarqube warnings because it does not enter the combiner code? – Jordan Jordanovski Jul 09 '23 at 16:44
  • @HonzaZidek great, thanks for the advice :) – Jordan Jordanovski Jul 10 '23 at 06:57
  • @JordanJordanovski I researched more about the `combiner` and I am completely withdrawing my previous comment. Please see the edit of my answer, read the linked StackOverflow question and consider, if you really want to take the risk of not implementing the combiner. – Honza Zidek Jul 10 '23 at 11:28
1

This can be done by grouping the stream with Collectors.groupingBy(..) overload employing custom downstream Collector implementation, created using Collector.of(...) with custom finisher (step allowing transformation from intermediate representation to final form.

List<TimeInterval> input = ... // provide something

Map<LocalDate, Double> output = input.stream().collect(
  // Produces Map<K, V>, allowing selection of grouping key.
  Collectors.groupingBy(
    e -> e.time.toLocalDate(), 
    // Custom collector that first gathers all TimeIntervals for given day,
    // then based on if there are any non-null entries return either number or null
    Collector.of(
      ArrayList<TimeInterval>::new,
      ArrayList<TimeInterval>::add,
      (a, b) -> { a.addAll(b); return a; },
      l -> {
        var filtered = l.stream().filter(e -> e.value1 != null && e.value2 != null).toList();
        if (filtered.isEmpty())
          // case when all entries contain null numbers.
          // otherwise such key would not exist.
          return null;

        return filtered.stream().collect(Collectors.summingDouble(e -> e.value1 * e.value2));
      })));

Output (Collectors.groupingBy() likely produces HashMap hence garbage ordering):

2018-01-03 null
2018-01-02 44.000000
2018-01-01 32.000000
orhtej2
  • 2,133
  • 3
  • 16
  • 26