The goal is to process a continuous stream of elements with the help of Java 8 streams. Therefore, elements are added to the data source of a parallel stream while processing that stream.
The Javadoc of Streams describes the following properties in section "Non-interference":
For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic.
That is the reason a ConcurrentLinkedQueue is used in our attempts, which returns true for
new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)
It is not explicitly said, that the data source must not be modified when used in parallel streams.
In our example for each of the elements in the stream the incremented counter value is added to the queue, which is the data source of the stream, until the counter is bigger than N. With calling queue.stream() everything works fine while sequential execution:
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testSequential(N));
}
public static int testSequential(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.stream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
As a second attempt the stream is parallel and throws an java.lang.AssertionError because check is smaller than N and not every element in the queue was processed. The stream may have finished execution early because the queue may have gotten empty at some point in time.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testParallel1(N));
}
public static int testParallel1(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
Next attempt was to signal main thread, once the continuous stream ‘really’ ended (the queue is empty) and close the stream object afterwards. Here the problem is that the stream object appears to read elements from the queue only once or at least not continuously and never reaches the ‘real’ end of the stream.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
try {
assertEquals(N, testParallel2(N));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static int testParallel2(int N) throws InterruptedException {
final Lock lock = new ReentrantLock();
final Condition cond = lock.newCondition();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
lock.lock();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
} else {
cond.signal();
}
lock.unlock();
});
lock.lock();
while (check.get() < N) {
cond.await();
}
lock.unlock();
stream.close();
return check.get();
}
}
The questions arising thereby are:
- Did we do something wrong?
- Is it an unspecified or even wrong usage of the Stream API?
- How can we achieve the desired behavior otherwise?