I'm struggling a bit with how and when completable futures are completed. I have created this test case:
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamOfCompletableFuturesTest {
@Test
public void testList() {
completeFirstTwoElements(
Stream.of("list one", "list two", "list three", "list four", "list five")
);
}
@Test
public void testIterator() {
Iterator<String> iterator = Arrays.asList("iterator one", "iterator two", "iterator three", "iterator four", "iterator five").iterator();
completeFirstTwoElements(
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
);
}
private void completeFirstTwoElements(Stream<String> stream) {
stream
.map(this::cf)
.limit(2)
.parallel()
.forEach(cf -> {
try {
System.out.println(cf.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
private CompletableFuture<String> cf(String result) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Running " + result);
return result;
});
}
}
And the output is:
Running list one
Running list two
list two
list one
Running iterator one
Running iterator two
Running iterator three
Running iterator four
Running iterator five
iterator two
iterator one
The testList
method works as expected. The CompletableFuture
's are only evaluated at the very end, so after the limit method has only kept the first two items.
However, the testIterator
method is unexpected. All CompletableFuture
's are completed and the limiting is only done afterwards.
If I remove the parallel()
method from the stream it works as expected. However, the processing (the forEach()
) should be done in parallel because in my full program it is a long-running method.
Can any one explain why this is happening?
It looks like this depends on the Java version, so I'm on 1.8:
$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)