7

I am trying to write a method that finds the indices of an object in a list of lists and takes advantage of parallelism. Here is my code.

// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present.
public static int[] indices(List<? extends List<?>> lists, Object o) {
    return IntStream.range(0, lists.size())
                    .boxed()
                    .flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j}))
                    .parallel()
                    .filter(a -> {
                        System.out.println(Arrays.toString(a));     // For testing only
                        return Objects.equals(o, lists.get(a[0]).get(a[1]));
                    })
                    .findAny()
                    .orElse(null);
}

When I run the following code

List<List<String>> lists = Arrays.asList(
        Arrays.asList("A", "B", "C"),
        Arrays.asList("D", "E", "F", "G"),
        Arrays.asList("H", "I"),
        Collections.nCopies(5, "J")
);
System.out.println("Indices are " + Arrays.toString(indices(lists, "J")));

the output is something like

[0, 0]
[0, 1]
[0, 2]
[3, 0]
[3, 1]
[3, 2]
[3, 3]
[2, 0]
[3, 4]
[1, 0]
[1, 1]
[2, 1]
[1, 2]
[1, 3]
Indices are [3, 0]

In other words, the search continues even after the object has been found. Isn't findAny supposed to be a short-circuiting operation? What am I missing? Also, what is the best way to take advantage of parallelism when iterating over a list of lists or a jagged array?

EDIT

Following the idea in @Sotirios's answer, I got an output of

Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 0]
Thread[main,5,main] [2, 0]
Thread[main,5,main] [2, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 0]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 2]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 3]
Thread[main,5,main] [0, 0]
Thread[main,5,main] [0, 1]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 1]
Thread[main,5,main] [0, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 3]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 4]
Indices are [3, 0]

Notice that

Thread[ForkJoinPool.commonPool-worker-3,5,main]

continues searching even after the answer is found.

Paul Boddington
  • 37,127
  • 10
  • 65
  • 116

3 Answers3

7

Short-circuiting operations do not guarantee to only pull as few elements as it takes to produce their result. They may do so, but it is not required.

The current implementation of flatMap is such that it will always push the substream's entire contents downstream. So even if your stream weren't parallel, you could see more elements flow through the stream than it takes to satisfy findAny.

Misha
  • 27,433
  • 6
  • 62
  • 78
  • It appears that this answer is right, and that `flatMap().filter().findAny()` is basically not short-circuiting. I don't know why it would be implemented this way. – Paul Boddington Nov 21 '15 at 03:09
  • 1
    "short-cuircuiting" merely means that it *may* terminate before examining the entire stream. It doesn't make any guarantees beyond that. – Misha Nov 21 '15 at 03:12
2

As for "why it was implemented this way". The problem lies deeply in the Stream API implementation. The flatMap body often creates a stream with some intermediate operations (like .flatMap(list -> list.stream().map(...).filter(...))). One could use inside the flatMap implementation stream.spliterator() and call tryAdvance many times until the cancellation is requested. However the spliterator() call returns somewhat artificial spliterator when the stream contains intermediate operations (if not, it just returns the original stream spliterator). This artificial spliterator has not very efficient tryAdvance() implementation, so using this implementation might be considered as worse performance drawback compared to consuming the whole flatMapped stream. In many cases you flatMap to some short streams, so here you may have a performance gain thanks to the current implementation.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
1

It's not that it continues, it's that it has already dispatched all sorts of threads to try and find the result and will wait until those have completed before returning the result.

In other words, the findAny terminal operation will submit the "search" task to a number of threads. These tasks are simply applying the filter Predicate and returning when something returns true. findAny, presumably, waits for one of these to return a value. There's no way for it to really cancel anything it has already submitted and it seems this implementation will block until the entire batch returns. It can only stop submitting any future batches.

You can verify this by logging the current thread:

System.out.println(Thread.currentThread() + " " + Arrays.toString(a)); // For testing only
Sotirios Delimanolis
  • 274,122
  • 60
  • 696
  • 724
  • I'm half asleep, so this is probably a stupid question, but if a load of worker threads are given tasks up front, and the whole method cannot return until they all finish, what does short-circuiting even mean? – Paul Boddington Nov 21 '15 at 02:45
  • 1
    @PaulBoddington I don't think it's _all_, I think it's some subset. – Sotirios Delimanolis Nov 21 '15 at 02:51
  • 1
    @PaulBoddington For example, I fire off 5 threads to search. All 5 might return a result. But I have to wait for all 5 before I can decide. (Well you really only have to wait for one, but you can't cancel the others. And this implementation seems to want to join on all those 5 tasks.) – Sotirios Delimanolis Nov 21 '15 at 03:00
  • 2
    @PaulBoddington I think that's a consequence of the `flatMap` as Misha states. I gotta run for my bus, brb. – Sotirios Delimanolis Nov 21 '15 at 03:06