8

I need to have a buffered char stream, into which I write in one thread and from which I read in another thread. Right now I'm using PipedReader and PipedWriter for it, but those classes cause a performance problem: PipedReader does a wait(1000) when its internal buffer is empty, which causes my application to lag visibly.

Would there be some library which does the same thing as PipedReader/PipedWriter, but with better performance? Or will I have to implement my own wheels?

Esko Luontola
  • 73,184
  • 17
  • 117
  • 128
  • This may not be what you want, but would your writing thread be able to notify the reading thread after it has written something? And the reading thread will keep reading while `ready()` is true, then sleep when it's not? – Phil May 16 '10 at 11:45
  • Could you show us your code, please? The `wait(1000)` shouldn't be the problem, because the writer notifies the reader when something is written. The reader then escapes from his wait(). – tangens May 16 '10 at 11:53
  • Thanks, Phil. I'm right now not at home, so I can't test it, but based on the source code PipedWriter.flush() appears to notify the readers. I'll try it later today. – Esko Luontola May 16 '10 at 11:54

4 Answers4

8

The problem was that when something is written to the PipedWriter, it does not automatically notify the PipedReader that there is some data to read. When one tries to read PipedReader and the buffer is empty, the PipedReader will loop and wait using a wait(1000) call until the buffer has some data.

The solution is to call PipedWriter.flush() always after writing something to the pipe. All that the flush does is call notifyAll() on the reader. The fix to the code in question looks like this.

(To me the PipedReader/PipedWriter implementation looks very much like a case of premature optimization - why not to notifyAll on every write? Also the readers wait in an active loop, waking up every second, instead of waking only when there is something to read. The code also contains some todo comments, that the reader/writer thread detection which it does is not sophisticated enough.)

This same problem appears to be also in PipedOutputStream. In my current project calling flush() manually is not possible (can't modify Commons IO's IOUtils.copy()), so I fixed it by creating low-latency wrappers for the pipe classes. They work much better than the original classes. :-)

Esko Luontola
  • 73,184
  • 17
  • 117
  • 128
1

It should be fairly easy to wrap a char stream API around BlockingQueue.

I must say, however, it seems quite perverse that PipedReader would use polling to wait for data. Is this documented somewhere, or did you discover it for yourself somehow?

Marcelo Cantos
  • 181,030
  • 38
  • 327
  • 365
  • I think you meant to link to "http://java.sun.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html" – Phil May 16 '10 at 11:41
  • Yes, that's my plan B, if I can't find a class which already does what I need. – Esko Luontola May 16 '10 at 11:42
  • Thanks Phil. I was too quick on the draw with my Google search. :-) – Marcelo Cantos May 16 '10 at 11:44
  • Indeed it's quite perverse. I discovered the lag in my testing, profiled the cause to be in PipedReader, and found the reason to be the hardcoded wait(1000) calls in http://www.java2s.com/Open-Source/Java-Document/6.0-JDK-Core/io-nio/java/io/PipedReader.java.htm - It's not the first time that some of the old(since JDK1.1)Java standard libraries are coded uglily. – Esko Luontola May 16 '10 at 11:47
  • Just to clarify to readers: `wait(1000)` is not like `sleep(1000)` - it falls out immediately when it is `notify`ed that data is available. The timeout is to handle error situations - if the data-source thread dies unexpectedly and can no longer notify, specifically. – Luke Usherwood Dec 08 '16 at 07:05
1

@Esko Luontola, I've been reading through your code in the sbt package to try to understand what you are doing. It seems like you want to start up a Process and pass input to it, and have the result of the action be teed to different places. Is this at all correct?

I would try modifying the main loop in ReaderToWriterCopier so that instead of doing a read() - a blocking operation that apparently when a PipedReader is involved causes polling - you explicitly wait for the Writer to flush. The documentation is clear that flush causes any Readers to be notified.

I'm not sure how to run your code so I can't get deeper into it. Hope this helps.

Phil
  • 2,828
  • 1
  • 23
  • 20
  • My use case is that there are multiple readers for what a `Process` prints, and each of those readers needs its own copy of the stream, starting from the current moment and ending when the reader is closed. Different readers to not affect each other. The one reading the PipedReader is for example `OutputReader.waitForOutput()`. The writer is `ReaderToWriterCopier`. The main program is `SbtRunner`, and the test sources have `SbtRunnerTester` which uses it. SBT stands for http://code.google.com/p/simple-build-tool/ – Esko Luontola May 16 '10 at 17:31
0

I implemented something a little similar, and asked a question whether anyone else had any better thought out and tested code.

Community
  • 1
  • 1
Stephen Denne
  • 36,219
  • 10
  • 45
  • 60