4

How do i convert type Stream<Object> into an InputStream? Currently, I get the iterator and loop through all of the data converting it to a byteArray and adding it to an inputStream:

 ByteArrayOutputStream bos = new ByteArrayOutputStream();
 ObjectOutputStream oos = new ObjectOutputStream(bos);

 Iterator<MyType> myItr = MyObject.getStream().iterator();

 while (myItr.hasNext()) {   

       oos.writeObject(myItr.next().toString()
         .getBytes(StandardCharsets.UTF_8));
   }
   oos.flush();
   oos.close();

   InputStream is = new ByteArrayInputStream(bao.toByteArray());

What is the overhead of doing this though? If my stream contains a terabyte of data, wouldn't I be sucking a terabyte of data into memory? Is there any better way to achieve this?

shmosel
  • 49,289
  • 6
  • 73
  • 138
BigBug
  • 6,202
  • 23
  • 87
  • 138
  • Are you sure that this kind of `InputStream` is what you need? You are converting objects to strings, get their UTF-8 representation as byte arrays and use object serialization for these array objects. It’s entirely unclear what you want to receive at the other end, currently it’s neither, the objects nor the strings. You could write strings directly instead or you could write a plain textual representation much simpler using a `Writer`, without the overhead of the object serialization protocol, but neither would recreate the original objects. – Holger Sep 27 '17 at 10:19

2 Answers2

2

You should be able to convert the OutputStream into an InputStream using a pipe:

PipedOutputStream pos = new PipedOutputStream();
InputStream is = new PipedInputStream(pos);

new Thread(() -> {
    try (ObjectOutputStream oos = new ObjectOutputStream(pos)) {
        Iterator<MyType> myItr = MyObject.getStream().iterator();
        while (myItr.hasNext()) {
            oos.writeObject(myItr.next().toString()
                .getBytes(StandardCharsets.UTF_8));
        }
    } catch (IOException e) {
        // handle closed pipe etc.
    }
}).start();

Inspired by this answer.

shmosel
  • 49,289
  • 6
  • 73
  • 138
  • Hm... i don't think i quite understand what this code is supposed to do? the pos is never assigned any data :S and it looks like we are looping through and adding the contents from myItr to the oos for no reason as it in no way ties back into the contents that the inputstream contains? Am i missing something here? – BigBug Aug 31 '17 at 02:44
  • 1
    `is` and `oos` are both tied to `pos`, which pipes the data from the output stream to the input stream. But now that I think of it, it would still have to buffer it all in memory, so this doesn't really solve your problem. – shmosel Aug 31 '17 at 02:51
  • 2
    @BigBug Ok I was wrong about it buffering all into memory. It actually blocks once it hits the buffer size, meaning you have to feed the output stream on a separate thread. See my updated answer. – shmosel Aug 31 '17 at 03:43
2

Would this work for you?

https://gist.github.com/stephenhand/292cdd8bba7a452d83c51c00d9ef113c

It's an InputStream implementation that takes a Stream<byte[]> as input data. You just need to .map() your abitrary objects to byte arrays however you want each object to be represented as bytes.

It only calls a terminal operation on the Stream when the InputStream is read, pulling objects off the Stream as the consumer reads more of the InputStream so it never loads the whole set into memory