4

I am looking for an operation on a Stream that enables me to perform a non-terminal (and/or terminal) operation every nth item. Although I use a stream of primes for example, the stream could just as easily be web-requests, user actions, or some other cold data or live feed being produced.

From this:

    Duration start = Duration.ofNanos(System.nanoTime());

    IntStream.iterate(2, n -> n + 1)
            .filter(Findprimes::isPrime)
            .limit(1_000_1000 * 10)
            .forEach(System.out::println);

    System.out.println("Duration: " + Duration.ofNanos(System.nanoTime()).minus(start));

To a stream function like this:

    IntStream.iterate(2, n -> n + 1)
            .filter(Findprimes::isPrime)
            .limit(1_000_1000 * 10)
            .peekEvery(10, System.out::println)
            .forEach( it -> {});
The Coordinator
  • 13,007
  • 11
  • 44
  • 73
  • 1
    This sort of operation is really best done *without streams.*. Streams just aren't suited to this sort of operations. Don't try to force it. – Louis Wasserman Oct 15 '16 at 15:51

3 Answers3

7

Create a helper method to wrap the peek() consumer:

public static IntConsumer every(int count, IntConsumer consumer) {
    if (count <= 0)
        throw new IllegalArgumentException("Count must be >1: Got " + count);
    return new IntConsumer() {
        private int i;
        @Override
        public void accept(int value) {
            if (++this.i == count) {
                consumer.accept(value);
                this.i = 0;
            }
        }
    };
}

You can now use it almost exactly like you wanted:

IntStream.rangeClosed(1, 20)
         .peek(every(5, System.out::println))
         .count();

Output

5
10
15
20

The helper method can be put in a utility class and statically imported, similar to how the Collectors class is nothing but static helper methods.

As noted by @user140547 in a comment, this code is not thread-safe, so it cannot be used with parallel streams. Besides, the output order would be messed up, so it doesn't really make sense to use it with parallel streams anyway.

Community
  • 1
  • 1
Andreas
  • 154,647
  • 11
  • 152
  • 247
  • I'm going to go with this. Although @saka1029 has a more local and readable algorithm, yours is make-once, use-everywhere, and it could fit nicely into any Streamhelper library. Nice! – The Coordinator Oct 15 '16 at 07:57
  • 3
    If you use a parallel stream, you might end up with an output like `15 20 10 5`. Also, this implementation is not thread-safe and should be synchronized when using a parallel stream, so you may get any result. – user140547 Oct 15 '16 at 09:24
  • 2
    @user140547: to be correct, if you use this with a parallel Stream, there are tons of different possible outputs, showing *arbitrary* numbers rather than the desired numbers in different order, and even making the method `synchronized` won’t fix that. This a consumer that relies on the *processing* order, while the desired result depends on the *encounter* order. Further, `peek` is meant to help debugging the processing only, not to perform a required action, i.e. under Java 9, this code will do nothing, because the `count` of a `rangeClosed` can be calculated without processing any items… – Holger Oct 17 '16 at 10:31
  • See [In Java streams is peek really only for debugging?](http://stackoverflow.com/a/33636377/2711488); this solution exactly falls into the two of the traps mentioned there… – Holger Oct 17 '16 at 10:33
  • @Holger: well true, synchronizing won't help here. peek is probably seldom a good idea. Anyway, if forEach returned a `long` representing the number of elements processed, there would be fewer attempts to use `peek().count()` ... – user140547 Oct 17 '16 at 10:54
  • @user140547: but in this specific case, the OP isn’t actually interested in the total count. Using `forEach` would still be a victim of unspecified processing order, using `forEachOrdered` using a statefull `Consumer` can solve the entire problem, of course, at the expense of destroying almost any benefit of parallel processing when being used with a parallel stream. – Holger Oct 17 '16 at 11:03
5

It is not a good idea to rely on peek() and count() as it is possible that the operation is not invoked at all if count() can be calculated without going over the whole stream. Even if it works now, it does not mean that it is also going to work in future. See the javadoc of Stream.count() in Java 9.

Better use forEach().

For the problem itself: In special cases like a simple iteration, you could just filter your objects like.

Stream.iterate(2, n->n+1)
        .limit(20)
        .filter(n->(n-2)%5==0 && n!=2)
        .forEach(System.out::println);

This of course won't work for other cases, where you might use a stateful IntConsumer. If iterate() is used, it is probably not that useful to use parallel streams anyway.

If you want a generic solution, you could also try to use a "normal" Stream, which may not be as efficient as an IntStream, but should still suffice in many cases:

class Tuple{ // ctor, getter/setter omitted
    int index;
    int value;
}

Then you could do:

Stream.iterate( new Tuple(1,2),t-> new Tuple(t.index+1,t.value*2))
        .limit(30)
        .filter(t->t.index %5 == 0)
        .forEach(System.out::println);

If you have to use peek(), you can also do

.peek(t->{if (t.index %5 == 0) System.out.println(t);})

Or if you add methods

static Tuple initialTuple(int value){
   return new Tuple(1,value);
}

static UnaryOperator<Tuple> createNextTuple(IntUnaryOperator f){
    return current -> new Tuple(current.index+1,f.applyAsInt(current.value));
}
static Consumer<Tuple> every(int n,IntConsumer consumer){
    return tuple -> {if (tuple.index % n == 0) consumer.accept(tuple.value);};
}

you can also do (with static imports):

  Stream.iterate( initialTuple(2), createNextTuple(x->x*2))
        .limit(30)
        .peek(every(5,System.out::println))
        .forEach(System.out::println);
user140547
  • 7,750
  • 3
  • 28
  • 80
1

Try this.

int[] counter = {0};
long result = IntStream.iterate(2, n -> n + 1)
    .filter(Findprimes::isPrime)
    .limit(100)
    .peek(x -> { if (counter[0]++ % 10 == 0) System.out.print(x + " ");} )
    .count();

result:

2 31 73 127 179 233 283 353 419 467