1

I have created a parsing library that accepts a provided input and returns a stream of Records. A program then calls this library and processes the results. In my case, my program is using something like

recordStream.forEach(r -> insertIntoDB(r));

One of the types of input that can be provided to the parsing library is a flat file, which may have a header row. As such, the parsing library can be configured to skip a header row. If a header row is configured, it adds a skip(n) element to the return, e.g.

Files.lines(input)**.skip(1)**.parallel().map(r -> createRecord(r));  

The parsing library returns the resulting Stream.

But, it seems that skip, parallel and forEach do not play nicely togetherThe end programmer must instead invoke forEachOrdered, but it is poor design to put this requirement on the programmer, to expect them to know they must use forEachOrdered if dealing with an input type of a file with a header row.

How can I enforce the ordered requirement myself when necessary, within the construction of the returned stream chain, to return a fully functional stream to the program writer, instead of a stream with hidden limitations? Is the answer to wrap the stream in another stream?

ManoDestra
  • 6,325
  • 6
  • 26
  • 50
Aaron
  • 414
  • 4
  • 16

2 Answers2

2

forEachOrdered is necessary not because of the skip(), but because your Stream is parallel. Even if the stream is parallel, the stream will skip the first element, as indicated in the documentation:

While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order.

It's clearly documented that forEach doesn't necessarily respect the order. Not using forEachOrdered when you care about the order is just a misuse of the Stream API:

The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism.

I would not return a parallel stream from the library. I would return a sequential one (where forEach would respect the order), and let the caller call parallel() and assume the consequences if it wants to.

Using a parallel stream by default is a bad idea.

Community
  • 1
  • 1
JB Nizet
  • 678,734
  • 91
  • 1,224
  • 1,255
  • Even if I remove `parallel()`, the end programmer will have an unexpected requirement to use `forEachOrdered()` if they add `parallel()`, in some circumstances only. I need skip to work reliably despite whatever the end programmer does or does not do. I keep coming back to using `skip()` and `forEachOrdered()` within the library (only where necessary) to encapsulate this from the end programmer and feeding the results to a new Stream. – Aaron Jun 29 '16 at 17:22
  • It's not unexpected at all. It's clearly documented, and thus expected by anyone knowing how to use the stream API. And it's not in some circumstances only. Every time you use a parallel stream, you can't rely on forEach to respect the order. Just like, every time you execute the same task from multiple threads, you can't expect the tasks to be executed sequentially. Finally, as I said in my answer, `skip()` **does** work reliably, whether the stream is parallel or not. – JB Nizet Jun 29 '16 at 17:27
  • If skip() doesn't reliably skip the first line when the stream is parallel, that means your stream is not ordered. But Files.lines() does return an ordered stream. – JB Nizet Jun 29 '16 at 17:47
  • Further discussion here: http://stackoverflow.com/questions/28259636/is-this-a-bug-in-files-lines-or-am-i-misunderstanding-something-about-paralle – Aaron Jun 29 '16 at 19:43
  • Ah, good find. So it is a bug in the JDK that is now fixed, if I get the answer correctly. – JB Nizet Jun 29 '16 at 19:58
0

Considering the relevant scenario where

  • The stream source is setup using skip
  • the client code is requesting parallel() execution
  • the client code is chaining an unordered terminal action like forEach
  • the code runs on a JRE older than 1.8u60

we have quite a special combination of circumstances, all being outside of the control of the particular library function that will chain the .map(r -> createRecord(r)) operation.

I don’t think that the responsibility lies at this point. Well, in general, the application code is not responsible for fixing things that are already recognized as JRE bugs and fixed in the up to date versions.

If for whatever reason you consider the necessity of providing a work-around for older JREs, it would be up to the stream source requiring the skip operation, to do this.

For this specific case, it’s not so hard. You may create the BufferedReader directly, invoke readLine() to skip the first line and then return the result of lines(), which allows to process all remaining lines. That might be even more efficient as a parallel Stream bearing a skip operation.

A more general solution would be an “eager skip first” operation like this:

public static <T> Stream<T> skipFirstImmediately(Stream<T> source) {
    Spliterator<T> sp=source.spliterator();
    sp.tryAdvance(skipped -> {});
    return StreamSupport.stream(sp, source.isParallel());
}

Note that when using this method, due to properties of the current Stream implementation, it can be beneficial to turn the source Stream to parallel before invoking this method rather than turning the resulting Stream to parallel, if parallel execution is desired.

This can be verified by comparing the output of

skipFirstImmediately(IntStream.range(0, 10).parallel().boxed())
    .peek(x -> System.out.println(Thread.currentThread()))
    .forEach(System.out::println);

and

skipFirstImmediately(IntStream.range(0, 10).boxed()).parallel()
    .peek(x -> System.out.println(Thread.currentThread()))
    .forEach(System.out::println);

which will be correct in either case but not exploiting the SMP capabilities in the latter.

Holger
  • 285,553
  • 42
  • 434
  • 765