22

I have two threads. One of them writes to PipedOutputStream, another one reads from corresponding PipedInputStream. Background is that one thread is downloading some data from remote server and multiplexes it to several other threads through piped streams.

The problem is that sometimes (especially when downloading large (>50Mb) files) I get java.io.IOException: Pipe broken when trying to read from PipedInputStream.
Javadoc says that A pipe is said to be broken if a thread that was providing data bytes to the connected piped output stream is no longer alive.
It is true, my writing thread really dies after writing all his data to PipedOutputStream.

Any solutions? How can I prevent PipedInputStream from throwing this exception? I want to be able to read all data that was written to PipedOutputStream even if writing thread finished his work. (If anybody knows how to keep writing thread alive until all data will be read, this solution is also acceptable).

levanovd
  • 4,095
  • 6
  • 37
  • 56

4 Answers4

20

Use a java.util.concurrent.CountDownLatch, and do not end the first thread before the second one has signaled that is has finished reading from the pipe.

Update: quick and dirty code to illustrate my comment below

    final PipedInputStream pin = getInputStream();
    final PipedOutputStream pout = getOutputStream();

    final CountDownLatch latch = new CountDownLatch(1);

    InputStream in = new InputStream() {

        @Override
        public int read() throws IOException {
            return pin.read();
        }

        @Override
        public void close() throws IOException {
            super.close();
            latch.countDown();
        }
    };


    OutputStream out = new OutputStream(){

        @Override
        public void write(int b) throws IOException {
            pout.write(b);
        }

        @Override
        public void close() throws IOException {
            while(latch.getCount()!=0) {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    //too bad
                }
            }
            super.close();
        }
    };

    //give the streams to your threads, they don't know a latch ever existed
    threadOne.feed(in);
    threadTwo.feed(out);
Jerome
  • 8,427
  • 2
  • 32
  • 41
  • 1
    Nice feature, its definitely +1, but it needs to share one instance of CountDownLatch between different threads. This is not very good, because writing and reading threads are created in different places and I want them not to know about each other. My architecture now is such that they know only that should write/read to/from given stream. – levanovd Dec 08 '09 at 11:48
  • Then, may-be you could extend Piped[In|Out]putStream to handle the manipulation of the CountDownLatch. – Jerome Dec 08 '09 at 11:53
  • or write your own Input/OutputStream that wraps the Pipe and the Latch (see the sample code that I added in my answer) – Jerome Dec 08 '09 at 14:33
  • There is no need for all this additional code (in current Java versions), see [wds's answer](https://stackoverflow.com/a/1867111). If you close both streams you do not have to worry about encountering a broken pipe. – Marcono1234 Apr 22 '19 at 20:31
11

Are you closing your PipedOutputStream when the thread that's using it ends? You need to do this so the bytes in it will get flushed to the corresponding PipedInputStream.

wds
  • 31,873
  • 11
  • 59
  • 84
  • I really think something else is going wrong here, in any case you should never receive a broken pipe if the writing thread ended normally. If its data doesn't fit in the `PipedInputStream` it should just block until there's room. – wds Dec 08 '09 at 16:32
  • 1
    This should be the accepted answer. Maybe the behavior was different when OP asked, or they were not properly closing it, however if you close the streams once you are done you will not encounter a broken pipe. Sadly the documentation is not very clear about this, but if in doubt you can look at the source code: https://github.com/openjdk/jdk/blob/0e14d5a98453407488057e6714f90f04d7dfa383/src/java.base/share/classes/java/io/PipedInputStream.java#L318 – Marcono1234 Apr 22 '19 at 20:28
2

PipedInputStream and PipedOutputStream are broken (with regards to threading). They assume each instance is bound to a particular thread. This is bizarre. I suggest using your own (or at least a different) implementation.

Tom Hawtin - tackline
  • 145,806
  • 30
  • 211
  • 305
  • This response gives no value. In what manner do these classes assume this? – matsa Nov 29 '17 at 13:40
  • 2
    each read/write stores a reference to the last reader/writer thread. When the buffer is empty/full and the last reading/writing thread is terminated you get an exception even if the another thread is now the owner but hasn't performed any operation on it yet. – cgrand Apr 06 '18 at 08:42
  • I agree entirely. They are just toys. Used them one in 1997 and immediately changed to a `Queue`. – user207421 Aug 13 '23 at 04:52
1

You might encounter problems with these classes when you use more than one reader or writer thread (see JDK-4028322).

However most users likely only use one reader and one writer thread. Since this is the case for you as well, the reason why you are encountering a broken pipe is most likely that you did not close the PipedOutputStream once you are done writing.

PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);

// You can of course also use an Executor or similar for this
new Thread(() -> {
    // Your method writing the data
    writeDataTo(out);
    // Close PipedOutputStream so that reader knows that the end of the data 
    // has been reached
    try {
        out.close();
    }
    catch (IOException e) {
        // Handle exception or simply ignore it; current implementation will NEVER 
        // throw an IOException: https://github.com/openjdk/jdk/blob/0e14d5a98453407488057e6714f90f04d7dfa383/src/java.base/share/classes/java/io/PipedOutputStream.java#L174
    }
}).start();

// Your method reading the data
readDataFrom(in);
// Close PipedInputStream so that writer fails instead of blocking infinitely in case 
// it tries to write again (for whatever reason)
in.close();

There is also no need to manually call PipedOutputStream.flush(), it only notifies waiting readers, but no data is lost if you directly call close().

Sadly the documentation is at the moment not very clear about all of this. In general it is not a good idea to rely on the implementation, but in this case it might be the only sensible solution:

Marcono1234
  • 5,856
  • 1
  • 25
  • 43