2

The problem at hand is to return the change between the first and last known measurement in a certain interval from a series of timestamp/value data. Also, I'd like to learn to use Java 8 Streams, so I'm trying to see if and how this could solve the problem.

A sample of the data:

DateTime,Value
...
1470012671,618.59
1470012912,618.62
1470013212,618.65
1470013512,618.68
1470013632,618.69
1470013900,618.71
...

Example input:

startMillis: 1470012800
endMillis: 1470013800

Expected answer (I choose the 'inner key values' when the start and end time are not present (see bonus question below)):

618.69 - 618.62 = 0.07

The code I have so far:

double amountKiloWattHours = 0;
long startMillis = startingTime.toEpochSecond();
long endMillis = startMillis + PERIOD_LENGTH_MILLIS;
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        // Currently, only returns all pairs within range...
    Stream<Pair> pairs = stream
        .skip(1)
        .map(p -> {
            return new Pair(p);
        })
        .filter(pair -> {
            return (pair.getMillis() > startMillis) && (pair.getMillis() < endMillis);
       });
} catch (Exception e) {
    // TODO specify and handle exceptions...
}

.

public class Pair {
    @Getter
    private final long millis;
    @Getter
    private final double kWhs;

    public Pair(String input) {
        String[] parts = input.split(",");
        this.millis = Long.parseLong(parts[0]);
        this.kWhs = Double.parseDouble(parts[1]);
    }
}

How do I now get the difference between the value of the last and the first pair in the interval?

Bonus question: How do I get the interpolated result where the exact timestamps' value is linearly interpolated between two surrounding values?

Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
wearego
  • 178
  • 10
  • 2
    You can do it on streams (for example using reduction) but that's going to be awfully ugly. Java was not designed with such operations in mind, and it shows. – mszymborski Aug 16 '16 at 15:57
  • As a side note, you can replace `p -> { return new Pair(p); }` with either, `p -> new Pair(p)` or even simpler `Pair::new`. – Holger Aug 17 '16 at 09:25

3 Answers3

2

There's no ready collector which can find you both maximal and minimal elements, but you can use the pairing collector I wrote in this answer (and which is readily available in my StreamEx library -- see JavaDoc):

Comparator<Pair> cmp = Comparator.comparingLong(Pair::getMillis);

double diff = stream
    .skip(1)
    .map(Pair::new)
    .filter(pair -> (pair.getMillis() > startMillis) && (pair.getMillis() < endMillis))
    .collect(pairing(Collectors.maxBy(cmp), Collectors.minBy(cmp),
      (maxPair, minPair) -> maxPair.get().getKWhs() - minPair.get().getKWhs()));

This will not collect unnecessary data: you will extract only min and max row.

Note: this code assumes that you have at least one Pair (ie. one timestamp/value instance within range, not necessarily both a distinct minimum and maximum timestamp/value) satisfying the condition. If you want to handle specially the case when there are no such pair (for example, return 0), you should modify the code (for example, like this: maxPair.map(Pair::getKWhs).orElse(0d) - minPair.map(Pair::getKWhs).orElse(0d)).

Community
  • 1
  • 1
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • This assumes the data to be sorted or that the OP’s words “last and first” actually meant “newest and oldest”, which might be the case, but should mentioned. – Holger Aug 17 '16 at 09:23
  • It looks like an elegant aproach. Too bad I have to add a new library to accomplish this. Is this such a rare case? – wearego Aug 17 '16 at 10:02
  • Also, what if there is only one element within the range? – wearego Aug 17 '16 at 10:04
  • @wearego you can just copy-paste the collector code to some utility class in your project, no need to add a library. If range contains only one element, then maxPair and minPair will be the same (that only element), so the difference will be 0. – Tagir Valeev Aug 17 '16 at 10:07
  • @wearego: Re rare case -- jdk provides a few summarizing collectors that can find **both** min and max (besides count, sum and average), but those don't handle pairs. – charlie Aug 17 '16 at 11:32
0

It's still handy to filter out the relevant "in-range" records using streams:

<T, U extends Comparable<U>> Predicate<T> inRange(U start, U end, Function<T, U> get) {
    return t -> Optional.of(t)
            .map(get)
            .filter(u -> start.compareTo(u) < 0 && 0 < end.compareTo(u))
            .isPresent();
}

List<Double> list = stream.skip(1)
        .map(Pair::new)
        .filter(inRange(startMillis, endMillis, Pair::getMillis))
        .map(Pair::getKWhs)
        .collect(Collectors.toList())

double diff = list.get(list.size() - 1) - list.get(0);

But then you still need to hand-pick the first and the last element from the list to substract them.

charlie
  • 1,478
  • 10
  • 20
  • I see you extracted the predicate to a separate function. What are the advantages of doing this over inline? – wearego Aug 17 '16 at 09:44
  • Imo, the advantage of this approach is that there are no additional libraries required. Though, I was hoping it wouldn't have to be stored as a list first. In my case the list will not be huge, so I assume the memory it uses is negligible. – wearego Aug 17 '16 at 09:55
  • @wearego: Extracting the predicate improves reuse, readability, testability, and expresses the intent clearly. It's not necessary here, though. – charlie Aug 17 '16 at 11:49
  • @wearego: Finding **both** min and max is a stateful operation in any case. Without collecting to a list, you can find both min and max by iterating over the stream (`.forEach()`, `.forEachOrdered()`, `.iterator()`) and updating a few external state variables (it prevents parallelism, however). Or use a lib (jOOL or StreamEx) to do it for you by pairing `Collectors.minBy()` and `Collectors.maxBy()`. – charlie Aug 17 '16 at 12:23
0

These are meter readings so I will assume both millis and kWhs are monotonically increasing. I would use the BiFunction form of the reduce stream function. I would create an accumulator class that accumulates the first and last seen values (i.e., a Pair). That class would look something like this:

private class Ac {
    private final Pair first;
    private final Pair last;

    public Ac() {
        this.first = null;
        this.last = null;
    }

    public Ac(Ac prev, Ac next) {
        this.first = prev.first != null ? prev.first : next.first;
        this.last = next.last;
    }

    public Ac(Ac prev, Pair next) {
        if (prev.first != null) {
            this.first = prev.first;
            this.last = next;
        } else {
            this.first = next;
            this.last = null;
        }
    }

    public double diff() {
        return first != null && last != null ? last.getkWhs() - first.getkWhs() : 0;
    }
}

Supporting interpolation requires a little more work, but it's just more of the same.

The stream part of the code would look like the following:

try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
    Ac result = stream
                .skip(1)
                .map(x -> new Pair(x))
                .filter(y -> (y.getMillis() > startMillis) && (y.getMillis() < endMillis))
                .reduce(new Ac(), (u, t) -> new Ac(u, t), (u1, u2) -> new Ac(u1, u2));
    amountKiloWattHours = result.diff();

There are a lot of pairwise processing problems that can be solved with non-trivial accumulators, but I would be wary of introducing this code unless I could justify the complexity. Am I enhancing reusability or not? Am I improving readability or not? Streams and lambda expressions are great, but they are not always the right solution.

John Morris
  • 396
  • 2
  • 10
  • I see you are using `reduce` where another answer uses `collect`. What is the difference? – wearego Aug 24 '16 at 11:24
  • Consider the possibility that I want the kWh difference between the start and end of the billing period, ~30 days*24 hours*3600 seconds*1000 millis. That's 2.5 billion entries. Saving that many entries in a collection is problematic for a single stream of data. Using that approach would probably be disastrous for a reasonable set of utility customers. The reduce approach is much more space efficient. 2.5 billion entries versus 2 entries. We still have to scan 2.5 billion entries, but we don't store them when processing. I'm assuming no indexing. – John Morris Aug 25 '16 at 15:40