4

I have a component that's given me data in an output stream (ByteArrayOutputStream) and I need to write this into a blob field of a SQL database without creating temp buffers hence the need to get an input stream.

Based on answers here and here I came up the following method to get an input stream from an output stream:

private PipedInputStream getInputStream(ByteArrayOutputStream outputStream) throws InterruptedException
{
    PipedInputStream pipedInStream = new PipedInputStream();
    Thread copyThread = new Thread(new CopyStreamHelper(outputStream, pipedInStream));
    copyThread.start();
    // Wait for copy to complete
    copyThread.join();
    return pipedInStream;
}

class CopyStreamHelper implements Runnable
{
    private ByteArrayOutputStream outStream;
    private PipedInputStream pipedInStream;

    public CopyStreamHelper (ByteArrayOutputStream _outStream, PipedInputStream _pipedInStream)
    {
        outStream = _outStream;
        pipedInStream = _pipedInStream;
    }

    public void run()
    {
        PipedOutputStream pipedOutStream = null;
        try
        {
            // write the original OutputStream to the PipedOutputStream
            pipedOutStream = new PipedOutputStream(pipedInStream);
            outStream.writeTo(pipedOutStream);
        }
        catch (IOException e) 
        {
            // logging and exception handling should go here
        }
        finally
        {
            IOUtils.closeQuietly(pipedOutStream);
        }
    }
}

Please note that the output stream already contains the written data and it can run up to 1-2 MB. However regardless of trying to do this in two separate threads or the same thread I am finding that always PipedInputStream hangs at the following:

Object.wait(long) line: not available [native method]   
PipedInputStream.awaitSpace() line: not available   
Community
  • 1
  • 1
Santosh
  • 660
  • 7
  • 13
  • The component should return an InputStream. An OutputStream is a sink and not meant to be read. In this case this is possible because of the special nature of ByteArrayOutputStream. Your goal to avoid temp buffers is lost because the ByteArrayOutputStream already contains the full data when it is passed. – Robert Kühne Mar 26 '15 at 19:14
  • @Sponiro, agreed but I want to prevent creating another buffer just to get an input stream. The said component generates output so I am not clear how it can return an InputStream? – Santosh Mar 27 '15 at 04:21
  • One simple solution would be to write your data to a file first and open an InputStream for reading. If you really want to use PipedInputStream for a real producer-consumer scenario you want to use two threads. It is perfectly possible but a bit more complicated. Your component would live in one thread and hand out an InputStream (a PipedInputStream really) and another thread would pick that one up and read from it. Your solution above creates a thread and waits for its results which is very different. – Robert Kühne Mar 27 '15 at 08:11
  • @Sponiro, I considered and dropped writing to a file since I think that is unnecessary. What I have here is not a producer-consumer scenario in the usual sense where both can happen asynchronously. The situation here is that the component has finished producing the output and has written the data to an OutputStream. Now I need to get that data and send it off to the DB via a SQL statement. Given this I was trying to find the most efficient way of doing it. From what it appears may be this is not possible - which implies the other similar answers on the linked SO questions don't work? – Santosh Mar 27 '15 at 10:51
  • @Sponiro, continuing... regarding the need for two threads to get the Piped streams working properly, the code I have shown above *is* doing that - do you see something wrong with it since it's not working as I expected. – Santosh Mar 27 '15 at 10:53
  • Yes, you have two threads, main and copyThread. But you are using them wrong. It is impossible to explain proper use of threads in a comment. But the basic problem is that your threads do not work in parallel. Your main thread starts the copy thread - which is ok. But then you wait for the copy thread with join which makes the whole thing non-parallel. – Robert Kühne Mar 27 '15 at 11:07
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/73928/discussion-between-santosh-and-sponiro). – Santosh Mar 27 '15 at 11:33

3 Answers3

2

You are overcomplicating the solution

ByteArrayOutputStream baos = ...;
byte[] data = baos.toByteArray();
return new ByteArrayInputStream(data);
ControlAltDel
  • 33,923
  • 10
  • 53
  • 80
  • 1
    The OP says he wants to do it `without creating temp buffers`. – Jonas Czech Mar 26 '15 at 19:06
  • 2
    I am trying to avoid creating a temp buffer since the data size is quite large (1-2 MB) and there will be many parallel threads running doing this. This was my fallback solution if I can't solve the original problem. – Santosh Mar 27 '15 at 04:23
0

I have worked out a very simple demo for use of PipedInput/OutputStream. It may or may not fit your usecase.

A producing class writing into PipedOutputStream:

public class Producer implements Runnable {

    private final PipedOutputStream pipedOutputStream;
    private final PipedInputStream pipedInputStream;

    public Producer() throws IOException {
        this.pipedOutputStream = new PipedOutputStream();
        this.pipedInputStream = new PipedInputStream(pipedOutputStream);
    }

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {

        try(InputStream inputStream = ByteStreams.limit(new RandomInputStream(), 100000)) {
            // guava copy function
            ByteStreams.copy(inputStream, pipedOutputStream);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedOutputStream.close();
            } catch (IOException e) {
                // no-op
            }
        }
    }

    public static void main(String[] args) throws IOException {

        try {
            Producer producer = new Producer();
            Consumer consumer = new Consumer(producer);

            Thread thread1 = new Thread(producer);
            Thread thread2 = new Thread(consumer);

            thread1.start();
            thread2.start();

            thread1.join();
            thread2.join();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

A consumer just counting the bytes:

public class Consumer implements Runnable {

    private final Producer producer;

    public Consumer(Producer producer) {
        this.producer = producer;
    }

    @Override
    public void run() {

        try (PipedInputStream pipedInputStream = producer.getPipedInputStream()) {

            int counter = 0;
            while (pipedInputStream.read() != -1) {
                counter++;
            }

            System.out.println(counter);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Robert Kühne
  • 898
  • 1
  • 12
  • 33
-1

At some level there has to be a buffer. See Connecting an input stream to an outputstream.

My favorite answer from there is from Dean Hiller:

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

See the api for details

Community
  • 1
  • 1
Andreas
  • 4,937
  • 2
  • 25
  • 35
  • 1
    What I need is exactly the opposite, get an InputStream such that I can read from the OutputStream. – Santosh Mar 27 '15 at 04:24
  • There is no outputstream to an inputstream. An inputstream is a source; you read things from an inputstream. An outputstream is a sink; you write things to an outputstream. If you are looking for something which you can read from as well as write to perhaps you list or a queue would be more suitable. – Andreas Mar 27 '15 at 16:09
  • Andreas - I believe Santosh is trying to redirect the contents of an output stream to another input stream. I have the same problem. I get a thumbnailed image as an output stream, and I wish to serve this over HTTP through an input stream. – Sridhar Sarnobat Oct 01 '15 at 05:09