1

Problem

Hi, I have a function where i going to return infinite stream of parallel (yes, it is much faster in that case) generated results. So obviously (or not) i used

Stream<Something> stream = Stream.generate(this::myGenerator).parallel()

It works, however ... it doesn't when i want to limit the result (everything is fine when the stream is sequential). I mean, it creates results when i make something like

stream.peek(System.out::println).limit(2).collect(Collectors.toList())

but even when peek output produces more than 10 elements, collect is still not finallized (generating is slow so those 10 can took even a minute)... and that is easy example. Actually, limiting those results is a future due the main expectation is to get only better than recent results until the user will kill the process (other case is to return first what i can make with throwing exception if nothing else will help [findFirst didn't, even when i had more elements on the console and no more results for about 30 sec]).

So, the question is...

how to copy with that? My idea was also to use RxJava, and there is another question - how to achieve similar result with that tool (or other).

Code sample

public Stream<Solution> generateSolutions() {
     final Solution initialSolution = initialSolutionMaker.findSolution();
     return Stream.concat(
          Stream.of(initialSolution),
          Stream.generate(continuousSolutionMaker::findSolution)
    ).parallel();
}

new Solver(instance).generateSolutions()
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .limit(5).collect(Collectors.toList());

Implementation of findSolution is not important. It has some side effect like adding to solutions repo (singleton, sych etc..), but nothing more.

Community
  • 1
  • 1
Azbesciak
  • 308
  • 4
  • 16

2 Answers2

5

As explained in the already linked answer, the key point to an efficient parallel stream is to use a stream source already having an intrinsic size instead of using an unsized or even infinite stream and apply a limit on it. Injecting a size doesn’t work with the current implementation at all, while ensuring that a known size doesn’t get lost is much easier. Even if the exact size can’t be retained, like when applying a filter, the size still will be carried as an estimate size.

So instead of

Stream.generate(this::myGenerator).parallel()
      .peek(System.out::println)
      .limit(2)
      .collect(Collectors.toList())

just use

IntStream.range(0, /* limit */ 2).unordered().parallel()
         .mapToObj(unused -> this.myGenerator())
         .peek(System.out::println)
         .collect(Collectors.toList())

Or, closer to your sample code

public Stream<Solution> generateSolutions(int limit) {
    final Solution initialSolution = initialSolutionMaker.findSolution();
    return Stream.concat(
         Stream.of(initialSolution),
         IntStream.range(1, limit).unordered().parallel()
               .mapToObj(unused -> continuousSolutionMaker.findSolution())
   );
}

new Solver(instance).generateSolutions(5)
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .collect(Collectors.toList());
Holger
  • 285,553
  • 42
  • 434
  • 765
  • you make this sound so easy... I haven't thought about it. – Eugene Sep 04 '17 at 19:16
  • But there is still one simple problem - limiting is a *feature* - it have to has the same algorithm. Btw i have solved the problem just adding `sequential().limit(1)` when i wanted it to be limited – Azbesciak Sep 05 '17 at 07:35
  • There is no requirement for a “feature” to have the same algorithm as another feature. The fact that `limit` performs bad in parallel execution has been documented, so while the inability to optimize this specific scenario internally might be indeed surprising, it is in line with the documentation. Calling `sequential().limit(1)` is baroque, you can achieve the same by removing `.parallel()` in the first place. Still, a sized stream may perform better than an infinite stream with limit even with sequential execution for some operations, e.g. it makes a difference for `toArray()`… – Holger Sep 05 '17 at 08:09
3

Unfortunately this is expected behavior. As I remember I've seen at least two topics on this matter, here is one of them.

The idea is that Stream.generate creates an unordered infinite stream and limit will not introduce the SIZED flag. Because of this when you spawn a parallel execution on that Stream, individual tasks have to sync their execution to see if they have reached that limit; by the time that sync happens there could be multiple elements already processed. For example this:

 Stream.iterate(0, x -> x + 1)
            .peek(System.out::println)
            .parallel()
            .limit(2)
            .collect(Collectors.toList());

and this :

IntStream.of(1, 2, 3, 4)
            .peek(System.out::println)
            .parallel()
            .limit(2)
            .boxed()
            .collect(Collectors.toList());

will always generate two elements in the List (Collectors.toList) and will always output two elements also (via peek).

On the other hand this:

Stream<Integer> stream = Stream.generate(new Random()::nextInt).parallel();

List<Integer> list = stream
            .peek(x -> {
                System.out.println("Before " + x);
            })
            .map(x -> {
                System.out.println("Mapping x " + x);
                return x;
            })
            .peek(x -> {
                System.out.println("After " + x);
            })
            .limit(2)
            .collect(Collectors.toList());

will generate two elements in the List, but it may process many more that later will be discarded by the limit. This is what you are actually seeing in your example.

The only sane way of going that (as far as I can tell) would be to create a custom Spliterator. I have not written many of them, but here is my attempt:

 static class LimitingSpliterator<T> implements Spliterator<T> {

    private int limit;

    private final Supplier<T> generator;

    private LimitingSpliterator(Supplier<T> generator, int limit) {
        Preconditions.checkArgument(limit > 0);
        this.limit = limit;
        this.generator = Objects.requireNonNull(generator);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> consumer) {
        if (limit == 0) {
            return false;
        }
        T nextElement = generator.get();
        --limit;
        consumer.accept(nextElement);
        return true;
    }

    @Override
    public LimitingSpliterator<T> trySplit() {

        if (limit <= 1) {
            return null;
        }

        int half = limit >> 1;
        limit = limit - half;
        return new LimitingSpliterator<>(generator, half);
    }

    @Override
    public long estimateSize() {
        return limit >> 1;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}

And the usage would be:

 StreamSupport.stream(new LimitingSpliterator<>(new Random()::nextInt, 7), true)
            .peek(System.out::println)
            .collect(Collectors.toList());
Eugene
  • 117,005
  • 15
  • 201
  • 306
  • Ok, thank you for the answer. However, that just explained me why my solution has errors. But, as i mentioned, limiting is just a *feature* - it has to look for solution infinity, so i cannot state on that solution. Any other idea (reactive for example?) – Azbesciak Aug 23 '17 at 11:55
  • By the way, curious thing is when i add `limit(2)` it returns about 1000 solutions, when i chain it with another `limit(2)` for example, it will return maybe 8-10 (random), but after another chaining no difference... – Azbesciak Aug 23 '17 at 11:59
  • @Azbesciak 1) no - I can't tell if there is a way to do it with `RxJava` and 2) those `limit.limit` might be just simple `samples` that you get : the behavior is undefined and there are no guarantees for that... – Eugene Aug 23 '17 at 13:09
  • It’s much easier than writing a custom `Spliterator`. Just keep in mind that mapping operations have no impact on the splitability or SIZED characteristics. So the canonical solution always leads to something along `IntStream.range(0, limit).map…(unused -> actualGeneratorFunction)` (or `LongStream…`, respectively)… – Holger Sep 04 '17 at 17:40