1

I'm writing some code that will be reading log lines and doing some processing in the background on that data. This processing would probably benifit from parallelization, such as what is offered by the Stream.parallel methods, and I was attempting to use this. This is the code I started out with that works perfectly.

public static void main(String[] args) {
    try {
        final Socket socket = new Socket(ADDRESS, PORT);
        final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        socket.getOutputStream().write(QUERY);
        reader.lines().forEach(System.out::println);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

This code connects and prints out all of my data. I would very much like to restructure this code as follows:

public static void main(String[] args) {
    try (Socket socket = new Socket(ADDRESS, PORT); 
         BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
        socket.getOutputStream().write(QUERY);
        reader.lines().forEach(System.out::println);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

But sadly this doesn't work. Even worse, going back to the original code, this doesn't even work:

public static void main(String[] args) {
    try {
        final Socket socket = new Socket(ADDRESS, PORT);
        final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        socket.getOutputStream().write(QUERY);
        reader.lines().parallel().forEach(System.out::println);
    } catch (IOException e) {
        e.printStackTrace();
    }

}

All that was added here was a .parallel call and this completely doesn't work. It just sits there and nothing is printed out.

I can live perfectly well and good without the 2nd version using the modified try(A a = new A()) {} as that doesn't look too good in this case. What I can't live without is figuring out why this .parallel call breaks everything.

I'm assuming the modified try statement is closing the streams as soon as I fall out of them (right after we start the forEach) so they are being killed and GC'd before operation. I can't for the life of me figure out what the hell is going on with the .parallel call.

As requested here is the output of jstack running on the .parellel() version of this code.

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):

"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007fd4f4001000 nid=0x4907 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd5280be000 nid=0x48d2 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd5280bb000 nid=0x48d1 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd5280b9800 nid=0x48d0 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd5280b6800 nid=0x48cf waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd5280b5000 nid=0x48ce runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd528082000 nid=0x48cd in Object.wait() [0x00007fd515c6d000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd52807d800 nid=0x48cc in Object.wait() [0x00007fd515d6e000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
    - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x00007fd528008000 nid=0x48c2 runnable [0x00007fd52fd9f000]
   java.lang.Thread.State: RUNNABLE
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    - locked <0x00000000ec086790> (a java.net.SocksSocketImpl)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.<init>(Socket.java:434)
    at java.net.Socket.<init>(Socket.java:211)
    at com.gravypod.Test.main(Test.java:48)

"VM Thread" os_prio=0 tid=0x00007fd528075800 nid=0x48ca runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd52801d800 nid=0x48c4 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd52801f000 nid=0x48c5 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd528021000 nid=0x48c6 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd528022800 nid=0x48c7 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007fd5280c0800 nid=0x48d3 waiting on condition 

JNI global references: 18

The Test.java:48 line is the Socket socket = new Socket line. This is the result of the fully-working non-parallel code (just using .lines()).

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):

"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007f9048001000 nid=0x4982 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007f90800be800 nid=0x496f runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f90800bb000 nid=0x496e waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f90800b9800 nid=0x496d waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f90800b6800 nid=0x496c waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f90800b5000 nid=0x496b runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f9080082000 nid=0x496a in Object.wait() [0x00007f907018d000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f908007d800 nid=0x4969 in Object.wait() [0x00007f907028e000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
    - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x00007f9080008000 nid=0x4961 runnable [0x00007f90884c3000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    - locked <0x00000000ec08e890> (a java.io.InputStreamReader)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    - locked <0x00000000ec08e890> (a java.io.InputStreamReader)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at java.io.BufferedReader$1.hasNext(BufferedReader.java:571)
    at java.util.Iterator.forEachRemaining(Iterator.java:115)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at com.gravypod.Test.main(Test.java:51)

"VM Thread" os_prio=0 tid=0x00007f9080075800 nid=0x4968 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f908001d800 nid=0x4963 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f908001f000 nid=0x4964 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f9080021000 nid=0x4965 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f9080022800 nid=0x4966 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f90800c1000 nid=0x4970 waiting on condition 

JNI global references: 319

The line Test.java:51 is the reader.lines().forEach line.

Joshua Katz
  • 157
  • 3
  • 12
  • Seens like a deadlock or livelock problem.. – Bruno Jan 03 '17 at 16:39
  • 1
    What are the relative speeds of (a) reading all the the data from the socket and (b) doing all the processing on the various "lines" you read from the socket? If (a) longer than or at least not much short than (b) you can't effectively parallelize this since (a) is inherently serial and will be the the bottleneck. If (b) is long you can do it, but just break it into two steps - the reading of all the lines (single-threaded), and the parallel processing of the lines. Then you can overlap them a bit to exact a bit more performance. – BeeOnRope Jan 03 '17 at 16:40
  • 1
    To diagnose your deadlock issue, take a `jstack` of the process when it hangs and post it. It is perhaps also easier to replace the socket+buffered reader with an `StringReader` and see if the problem still exists, since it would allow you to post an [MVCE](http://stackoverflow.com/help/mcve). – BeeOnRope Jan 03 '17 at 16:42
  • @BeeOnRope effectively the speed at which I am reading is as fast as data is coming over the network while the processing I am doing involves matching to mutliple regexs, pulling out data and parsing it, running 3-5 HTTP qeuries for some APIs, and then pushing the data into an SQL server. – Joshua Katz Jan 03 '17 at 16:43
  • 1
    So what's the answer? How long does the reading (typically) take? How long does the subsequent parsing (typically) take? I can't determine that based on the description since the network could reasonably vary between 500 Kbps and 5,000,000 Kpbs, and the processing could vary in a similar range. – BeeOnRope Jan 03 '17 at 16:46
  • 1
    Also, is the example that's hanging the _actual_ example above with `System.out::println` or is it something where the method in `forEach()` is much more complicated? If the latter, we don't really have a hope of diagnosing it. – BeeOnRope Jan 03 '17 at 16:47
  • 1
    In your original example, the one that works, does the main() method ever finish? In other words, does the server close the socket, or does the client wait forever for the next line? – Lucian Jan 03 '17 at 16:47
  • @BeeOnRope it prints nothing, just hangs. It works perfectly fine with a StringReader. I'll install JStack and give it a bit of probing – Joshua Katz Jan 03 '17 at 16:48
  • @Lucian no, this will be reading "forever" or at least until a human intervenes. – Joshua Katz Jan 03 '17 at 16:49
  • 2
    I have the feeling you are not showing the complete picture here. The parallel() version blocks on the socket connect, that could be easily explaind by the server only supports a single connection and you have the other version still running. If you have multiple parallel() versions in the same jvm it will fill up the ForkJoinThreadPool and eventually block. Can you share the >30 other lines in your class. – k5_ Jan 03 '17 at 19:41
  • @k5_ the only thing different about my code, at all, is that I'm passing to a function `public static void onLine(String line)` instead of print. In there I'm splitting the string, pulling out some data, and priting the data. This is a simple prototype. Also, the old version isn't running. Edit: Also there is no other .parallel() in this project. The is all that I am running. I *have* included in my post a representative sample of my code. – Joshua Katz Jan 04 '17 at 02:09

2 Answers2

1

I imagine that parallel() or the forEach() on a parallel stream waits to read all the input before parallelising the task. Because the server never closes the connection, it will wait forever.

Your task is not really parallelizable. Data comes sequentially over the wire, so reading it in parallel cannot work.

Lucian
  • 181
  • 1
  • 5
  • That is not the case here as .lines does not block to read all input. In it's doc string it also says `Returns a Stream, the elements of which are lines read from this BufferedReader. The Stream is lazily populated, i.e., read only occurs during the terminal stream operation.` – Joshua Katz Jan 03 '17 at 16:59
  • Also I'd like to add that the .lines() works perfectly I just don't want to waste time on my processing instead of reading from the stream which I need to do constantly (as the buffer will fill very quickly. It's not really about parallelizing the work as it is not taking up time that I could be reading. I have thought through the design and this is how I'd like to do it, my issue is _why_ this isn't working because unless I'm misunderstanding something this should be perfectly fine. – Joshua Katz Jan 03 '17 at 17:01
  • 1
    It's the forEach that blocks. It will wait for all threads to finish, but some of them will never finish since the socket is never closed. Try closing the socket on the server, and you'll see the result. – Lucian Jan 03 '17 at 17:09
  • How do I have this happen in parallel then? How do I apply a computation in parallel. – Joshua Katz Jan 03 '17 at 17:13
  • 2
    @Lucian - in that case it would block in the non-parallel case too - if the underlying InputStream never terminates (and blocks instead), `lines()` will _aways_ ultimately block, parallel or not. – BeeOnRope Jan 03 '17 at 17:21
  • 1
    You read the lines sequentially, like in your first example. You submit them to a ThreadPool to be executed (i.e. you create a Runnable that does something with a line). Depending on how many threads you have in your ThreadPool, they will be handled in parallel. – Lucian Jan 03 '17 at 17:24
  • 1
    @BeeOnRope: that’s acknowledged by [this comment](http://stackoverflow.com/questions/41448061/using-bufferedreader-lines-breaks-in-parallel#comment70103079_41448061). Though, there is a technical difference between “blocking (forever)” and “waiting very long (until the OP gives up)”. Since the stream source is inherently sequential, the underlying implementation will try to buffer elements to enable parallel processing and the number it tries to buffer is insanely large. See [Reader#lines() parallelizes badly due to nonconfigurable batch size…](http://stackoverflow.com/q/22569040/2711488) – Holger Jan 03 '17 at 19:48
  • @Holger can you post that as a responce so I can mark your answer as correct? This thing you've linked seems correct. – Joshua Katz Jan 04 '17 at 02:15
0

It seems, your application isn’t hanging, technically, but just waiting for a lot of input, before performing observable work. This is a combination of two implementation details. When you are starting a parallel stream operation, it will first try to split the workload until every CPU core has something to do, before actually starting to process elements. This combines with the Reader#lines() parallelizes badly due to nonconfigurable batch size issue.

Simply said, when a Stream has an unknown size, the implementation will try to buffer batches of sizes that are multiples of 1024, growing on each split. This great answer shows, how the splitting will happen for a stream of an unknown size with multiple cores, showing that multiples of 1024 elements will get buffered in the process. This can take a very long time, before the consumer passed to forEach is ever invoked.

Note that processing an infinite source via the non-short-circuiting forEach is outside the scope of the Stream API anyway. Assuming a timely side effect is an assumption about the processing order of the Stream, but there is no guaranty about it.

This answer guides you to a work-around. You can use something like

try(Socket socket = new Socket(ADDRESS, PORT);
    BufferedReader reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream()))) {

    socket.getOutputStream().write(QUERY);
    Stream.generate(() -> {
        try { return reader.readLine(); }
        catch (IOException ex) { throw new UncheckedIOException(ex); }
    }).parallel().forEach(System.out::println);
} catch(IOException|UncheckedIOException e) {
    e.printStackTrace();
}

But, as said, this isn’t the intended use case of the Stream API…

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765