9

I've seen two answers on SO that claim that the PipedInputStream and PipedOutputStream classes provided by Java are flawed. But they did not elaborate on what was wrong with them. Are they really flawed, and if so in what way? I'm currently writing some code that uses them, so I'd like to know whether I'm taking a wrong turn.

One answer said:

PipedInputStream and PipedOutputStream are broken (with regards to threading). They assume each instance is bound to a particular thread. This is bizarre.

To me that seems neither bizarre nor broken. Perhaps the author also had some other flaws in mind?

Another answer said:

In practice they are best avoided. I've used them once in 13 years and I wish I hadn't.

But that author could not recall what the problem was.


As with all classes, and especially classes used in multiple threads, you will have problems if you misuse them. So I do not consider the unpredictable "write end dead" IOException that PipedInputStream can throw to be a flaw (failing to close() the connected PipedOutputStream is a bug; see the article Whats this? IOException: Write end dead, by Daniel Ferbers, for more information). What other claimed flaws are there?

Community
  • 1
  • 1
Raedwald
  • 46,613
  • 43
  • 151
  • 237
  • This http://stackoverflow.com/questions/484119/why-doesnt-more-java-code-use-pipedinputstream-pipedoutputstream kind of covers it. They're not really "flawed", just a bit tricky and also usually a sign code smell, if you're 100% sure you need them and that there's no error in the design, there's no real problem with using them... – TC1 Feb 28 '12 at 14:42
  • 1
    A quick look as I wanted to use one. It is at least "Under Featured" as the reading thread doesn't really wait for the writing thread to write the complete read request, and abort with EOF exception if the writer closes it etc. It has very primitive thread handling and synchronization, and requires the buffer is as large as the largest read request. – peterk Mar 27 '13 at 00:17

5 Answers5

9

They are not flawed.

As with all classes, and especially classes used in multiple threads, you will have problems if you misuse them. The unpredictable "write end dead" IOException that PipedInputStream can throw is not a flaw (failing to close() the connected PipedOutputStream is a bug; see the article Whats this? IOException: Write end dead, by Daniel Ferbers, for more information).

Raedwald
  • 46,613
  • 43
  • 151
  • 237
  • I'd like you opinion on a problem I recently had. Could you please take a look? http://stackoverflow.com/questions/21884188/android-pipedinputstream-messes-up-data – fabian Feb 19 '14 at 19:03
3

I have used them nicely in my project and they are invaluable for modifying streams on the fly and passing them around. The only drawback seemed to be that PipedInputStream had a short buffer (around 1024) and my outputstream was pumping in around 8KBs.

There is no defect with it and it works perfectly well.

-------- Example in groovy

public class Runner{


final PipedOutputStream source = new PipedOutputStream();
PipedInputStream sink = new PipedInputStream();

public static void main(String[] args) {
    new Runner().doit()
    println "Finished main thread"
}


public void doit() {

    sink.connect(source)

    (new Producer(source)).start()
    BufferedInputStream buffer = new BufferedInputStream(sink)
    (new Consumer(buffer)).start()
}
}

class Producer extends Thread {


OutputStream source
Producer(OutputStream source) {
    this.source=source
}

@Override
public void run() {

    byte[] data = new byte[1024];

    println "Running the Producer..."
    FileInputStream fout = new FileInputStream("/Users/ganesh/temp/www/README")

    int amount=0
    while((amount=fout.read(data))>0)
    {
        String s = new String(data, 0, amount);
        source.write(s.getBytes())
        synchronized (this) {
            wait(5);
        }
    }

    source.close()
}

}

class Consumer extends Thread{

InputStream ins

Consumer(InputStream ins)
{
    this.ins = ins
}

public void run()
{
    println "Consumer running"

    int amount;
    byte[] data = new byte[1024];
    while ((amount = ins.read(data)) >= 0) {
        String s = new String(data, 0, amount);
        println "< $s"
        synchronized (this) {
            wait(5);
        }
    }

}

}

Ganesh Krishnan
  • 7,155
  • 2
  • 44
  • 52
1

One flaw might be that there is not clear way for the writer to indicate to the reader that it encountered a problem:

PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out);

new Thread(() -> {
    try {
        writeToOut(out);
        out.close();
    }
    catch (SomeDataProviderException e) {
        // Have to notify the reading side, but how?
    }
}).start();

readFromIn(in);

The writer could close out, but maybe the reader misinterprets that as end of data. To handle this correctly additional logic is needed. It would be easier if functionality to manually break the pipe was provided.

There is now JDK-8222924 which requests a way to manually break the pipe.

Marcono1234
  • 5,856
  • 1
  • 25
  • 43
0

From my point of view there is a flaw. More precisely there is a high risk of a deadlock if the Thread which should pump data into the PipedOutputStream dies prematurely before it actually writes a single byte into the stream. The problem in such a situation is that the implementation of the piped streams is not able to detect the broken pipe. Consequently the thread reading from PipedInputStream will wait forever (i.e. deadlock) in it's first call to read().

Broken pipe detection actually relies on the first call to write() as the implementation will than lazily initialize the write side thread and only from that point in time broken pipe detection will work.

The following code reproduces the situation:

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import org.junit.Test;

public class PipeTest
{
    @Test
    public void test() throws IOException
    {
        final PipedOutputStream pout = new PipedOutputStream();
        PipedInputStream pin = new PipedInputStream();

        pout.connect(pin);

        Thread t = new Thread(new Runnable()
        {
            public void run()
            {
                try
                {
                    if(true)
                    {
                        throw new IOException("asd");
                    }
                    pout.write(0); // first byte which never get's written
                    pout.close();
                }
                catch(IOException e)
                {
                    throw new RuntimeException(e);
                }
            }
        });
        t.start();

        pin.read(); // wait's forever, e.g. deadlocks
    }
}
Christian K.
  • 605
  • 8
  • 10
  • You want your write and read operations to be in independent threads. To have the consumer not immediately exit because of a race condition, it is best to do a pin.available() check to see that it is greater than 0 prior to conducting a read operation. Then you can do repeated available and read operations until available == 0 again. You also want to have exception handling for the read side in case the writer disconnects. – Robert Casey Jul 22 '15 at 22:27
  • This test code fails to close the output stream when there is an exception. If you surround the content of your try block with `try (OutputStream pout_closed = pout) { ... }` the code will not deadlock. – Alex Q Jul 12 '16 at 20:42
  • @RobertCasey you would still have to wait until `available()` return > 0, which is then essential the same as directly calling `read()`. Additionally you cannot really do much exception handling when the reading thread is already blocked on the `read()` call. – Marcono1234 Apr 19 '19 at 22:02
  • @AlexQ closing the pipe would indicate to the reader that the end of the data has been reached. It might handle this situation then incorrectly and you would need additional logic to determine whether the end was actually reached or the writer encountered a problem. – Marcono1234 Apr 19 '19 at 22:07
0

The flaws that I see with the JDK implementation are:

1) No timeouts, reader or writer can block infinitely.

2) Suboptimal control over when data is transferred (should be done only with flush, or when circular buffer is full)

So I created my own to address the above, (timeout value passed via a ThreadLocal):

PipedOutputStream

How to use:

PiedOutputStreamTest

Hope it helps...

user2179737
  • 493
  • 3
  • 6