21

Assuming I have a Java IntStream, is it possible to convert it to an IntStream with cumulative sums? For example, a stream starting with [4, 2, 6, ...] should be converted to [4, 6, 12, ...].

More generally, how should one go about implementing stateful stream operations? It feels like this should be possible:

myIntStream.map(new Function<Integer, Integer> {
    int sum = 0; 
    Integer apply(Integer value){ 
        return sum += value; 
    }
);

With the obvious restriction that this works only on sequential streams. However, Stream.map explicitely requires a stateless map function. Am I right in missing a Stream.statefulMap or Stream.cumulative operation or is that missing the point of Java streams?

Compare for example to Haskell, where the scanl1 function solves exactly this example:

scanl1 (+) [1 2 3 4] = [1 3 6 10]
Adrian Leonhard
  • 7,040
  • 2
  • 24
  • 38
  • 3
    Streams are pretty deliberately designed to only support parallelizable operations, which isn't really the case for scanl. – Louis Wasserman Feb 05 '15 at 23:31
  • 2
    Streams make more sense if you pretend they're in `java.util.concurrent.stream` instead of `java.util.stream`. – Jeffrey Bosboom Feb 05 '15 at 23:48
  • 9
    Note that if your source is an array, you can simply use [`Arrays.parallelPrefix(array, Integer::sum);`](http://docs.oracle.com/javase/8/docs/api/java/util/Arrays.html#parallelPrefix-int:A-java.util.function.IntBinaryOperator-)… – Holger Feb 06 '15 at 11:38
  • 3
    You want Arrays.parallelPrefix. – Brian Goetz Feb 06 '15 at 15:53

2 Answers2

11

You can do this with an atomic number. For example:

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

public class Accumulator {
    public static LongStream toCumulativeSumStream(IntStream ints){
        AtomicLong sum = new AtomicLong(0);
        return ints.sequential().mapToLong(sum::addAndGet);
    }

    public static void main(String[] args){
        LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
        sums.forEachOrdered(System.out::println);
    }
}

This outputs:

1
3
6
10

I've used a Long to store the sums, because it's entirely possible that two ints add up to well over Integer.MAX_VALUE, and a long has less of a chance of overflow.

Steve K
  • 4,863
  • 2
  • 32
  • 41
  • 2
    I found this answer interesting and unexpected. Could you answer a few questions about it? Why did you use AtomicReference rather than AtomicInteger - with addAndGet? But more importantly, why does this change the fact that if the stream was made parallel there's no guarantee of the order in which the accumulation occurs? Do AtomicReference somehow change the behaviour of streams? If so can you point to a tutorial or documentation on that? thanks. – sprinter Feb 06 '15 at 07:34
  • 2
    Just a supplement to the previous question, I thought I'd try it myself with a parallel IntStream. It doesn't work. So this isn't a good answer at all. – sprinter Feb 06 '15 at 07:42
  • 2
    @sprinter Yes, this doesn't work on parallel streams, but as the operation cannot be parallelized anyway, one can simply call .sequential() before running it. – Adrian Leonhard Feb 07 '15 at 07:45
  • 1
    As you say, the operation cannot be parallelized, so I've called `.sequential()` within my updated code. I'm also now using an AtomicLong instead of AtomicReference. The question appeared to be "how do I get another stream from this stream", which should never use a Collector or intermediate construct if it's possible to avoid - collecting to a list is a heavy operation, whereas my method takes almost no extra time for a stream of theoretically infinite length... – Steve K Feb 09 '15 at 01:46
5

It's possible to do with a collector that then creates a new stream:

class Accumulator {
    public static void accept(List<Integer> list, Integer value) {
        list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
    }

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
        int total = list1.get(list1.size() - 1);
        list2.stream().map(n -> n + total).forEach(list1::add);
        return list1;
    }
}

This is used as:

myIntStream.parallel()
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
    .stream();

Hopefully you can see that the important attribute of this collector is that even if the stream is parallel as the Accumulator instances are combined it adjusts the totals.

This is obviously not as efficient as a map operation because it collects the whole stream and then produces a new stream. But that's not just an implementation detail: it's a necessary function of the fact that streams are intended to be potentially concurrently processed.

I have tested it with IntStream.range(0, 10000).parallel() and it functions correctly.

sprinter
  • 27,148
  • 6
  • 47
  • 78
  • I must say this is a very good answer. Might not be perfect answer for this question, but for parallel acumulation. – Jatin Feb 06 '15 at 08:36
  • 1
    @Jatin thanks. I actually realised afterwards that it could be quite a bit simpler than this - the running total isn't really required because it's just the last element in the list. I'll update to show you what I mean - let me know what you think. – sprinter Feb 06 '15 at 11:37
  • This is more readable. Thanks. I did nto think alogn the parallel lines, that u can have blocks and then one can merge them. great. – Jatin Feb 06 '15 at 11:50
  • 1
    Why `Deque` and `LinkedList`, though? `ArrayList` would give you superior performance and memory footprint because you never prepend, only append. You could also cheaply optimize by checking which operand of `combine` is shorter and concat that one onto the longer one. – Marko Topolnik Feb 06 '15 at 12:54
  • 1
    @MarkoTopolnik all excellent points. I chose Deque for a pretty lazy reason: it had a peekLast method! But I agree that ArrayList would be a better choice - I'll update the answer in case anyone tries to use this technique. As far as performance goes I tried it with a stream of 10 million items and it's very quick. – sprinter Feb 06 '15 at 13:05
  • 1
    @MarkoTopolnik as a matter of interest I logged information during combine on a very large stream and found that the lists were always the same length. I guess that must be an artifact of how streams are implemented. – sprinter Feb 06 '15 at 13:29
  • 2
    That's an artifact of how an in-memory stream source, such as an array or arraylist, is split (down the middle). If you used, say, `Files.lines()` or any other IO-based source, you'd be seeing different sizes. Unbalanced trees would also produce non-uniform split sizes. – Marko Topolnik Feb 06 '15 at 13:31
  • You may insert a `filter(…)` to encounter lists of different lengths… – Holger Feb 06 '15 at 19:14
  • 1
    Unfortunately this solution subverts the laziness property of streams, and doesn't work at all on infinite streams. – Adrian Leonhard Feb 07 '15 at 07:42
  • Note that a combine function may encounter empty lists, e.g. if the preceding stream operations contain `filter` or `flatMap`, and has to handle that. It would be enough, if the `combine` method starts with `if(list1.isEmpty()) return list2;`. – Holger Jul 03 '19 at 09:52