2

I have some code that looks like this (simplified pseudo-code):

[...]
// stream constructed of series of web service calls
Stream<InputStream> slowExternalSources = StreamSupport.stream(spliterator, false);
[...]

then this

public Stream<String> getLines(Stream<InputStream> slowExternalSources) {
  return slowExternalSources.flatMap(is -> new BufferedReader(new InputStreamReader(is)).lines())
     .onClose(() -> is.close());
}

and later this

Stream<String> lineStream = getLines();
lineStream.parallel().forEach( ... do some fast CPU-intensive stuff here ... }

I've been strugging to try to make this code execute with some level of parallelisation.

Inspection in jps/jstack/jmc shows that all the InputStream reading is occurring in the main thread, and not paralleling at all.

Possible culprints:

  • BufferedReader.lines() uses a Spliterator with parallel=false to construct the stream (source: see Java sources)

  • I think I read some articles that said flatMap does not interact well with parallel(). I am not able to locate that article right now.

How can I fix this code so that it runs in parallel?

I would like to retain the Java8 Streams if possible, to avoid rewriting existing code that expects a Stream.

NOTE I added java.util.concurrent to the tags because I suspect it might be part of the answer, even though it's not part of the question.

Alex R
  • 11,364
  • 15
  • 100
  • 180
  • 1
    1. `.onClose(is -> is.close())` how does this compile? 2. I believe the `BaseStream.onClose` already is *AutoClosable* 3. `parallel` streams mostly use the FJP unless specified otherwise. 4. Why not collect to a List and do the CPU intensive stuff using a custom `ExecutorService` and multiple threads? – Naman Oct 23 '19 at 16:41
  • See this answer: https://stackoverflow.com/questions/34341656/why-is-files-list-parallel-stream-performing-so-much-slower-than-using-collect/34348820#34348820 TL;DR: unbounded streams do not parallelise well with spliterators. Dump the lines into List and then parallel stream it – diginoise Oct 23 '19 at 16:47
  • @Naman I just copied and pasted pieces of the actual code to post here, apologies if that close() is in the wrong place – Alex R Oct 23 '19 at 16:49
  • 1
    @diginoise dumping to a List causes OOM, these are huge datasets – Alex R Oct 23 '19 at 16:51
  • @AlexR I'm afraid, you'll need a more old-fashion solution... how it'll look like depends on whether you need the lines to be processed in order or not. – maaartinus Oct 23 '19 at 23:13
  • The lines don’t need to be in order. I want to use Streams for this so that I can learn Streams really well. – Alex R Oct 23 '19 at 23:39
  • Besides `.onClose(is -> is.close())` doesn’t work at all (a `Runnable` doesn’t take arguments), it’s obsolete, as the stream returned by the `flatMap` function is already closed automatically. The parallel processing depends of the `Stream`, not the sub-streams. If it doesn’t contain enough elements for parallel processing, don’t use a nested stream. – Holger Oct 24 '19 at 07:48

0 Answers0