2

As I understand, an InputStream is a stream of bytes. I am interested in converting an InputStream object to a stream of bytes. Basically, an implementation of the following method.

public Stream<byte[]> toStream(final InputStream is, final int bufferSize);

What would be the best way to get this done? The buffer size is the number of bytes read from the InputStream at a time.

Naman
  • 27,789
  • 26
  • 218
  • 353
thisisshantzz
  • 1,067
  • 3
  • 13
  • 33
  • How are you currently reading/processing your `InputStream`? – ernest_k Nov 07 '19 at 08:04
  • Why do you want a `Stream`? How are you planning on processing it? – Kayaman Nov 07 '19 at 08:07
  • @Kayaman I have a file somewhere on the cloud and I have an InputStream to read the contents of the file. I am building a service that allows a client to download this file but it should not temporarily store the entire file in memory on the server. So I was thinking of sending a Flux object back to the client so that the file can be streamed over to the client. I went through the Flux object's javadoc and it is mentioned there that we can create a Flux object from a Stream. – thisisshantzz Nov 07 '19 at 08:13
  • @ernest_k, I am not doing anything at the moment because I don't want to store the file temporarily as say a temp file or byte array on the server's memory. – thisisshantzz Nov 07 '19 at 08:14
  • 3
    @thisisshantzz then you're looking at the wrong approach. You don't need a `Stream`, you need to connect the `InputStream` to the client's `OutputStream`, and just stream bytes normally. How this is done specifically depends on your environment, but for example with Spring you could use a `StreamingResponseBody`. – Kayaman Nov 07 '19 at 08:22
  • Why? The last thing you want to do is store the file in memory anywhere, let alone on a server. – user207421 Nov 07 '19 at 08:23
  • 1
    In the signature of the method you have posted, you are trying to convert an InputStream to a Stream of Array of bytes not Stream of bytes. First of all you have to explain what is the meaning of that array of bytes. If they are text lines you can use this stackoverflow.com/questions/30336257/…. If you want a stream of bytes instead of stream of array of bytes you can use the readAllBytes method of the inputStream and then convert the array to a Stream – JArgente Nov 07 '19 at 08:24
  • @Kayaman, would it be possible to connect this `InputStream` to say a Flux object if I am say using Spring Webflux? – thisisshantzz Nov 07 '19 at 08:27
  • 1
    @thisisshantzz if you just want to "redirect" the `InputStream` to the client, then any additional `Flux` or `byte[]` in between is completely unnecessary. It's just normal basic streaming. See [here](https://stackoverflow.com/questions/20333394/return-a-stream-with-spring-mvcs-responseentity) for more info. – Kayaman Nov 07 '19 at 08:28
  • @JArgente Question already explains: *"The `buffersize` is the **number of bytes** read from the InputStream **at a time**"*. In order for stream to receive a number of bytes at a time, the stream elements must be **arrays** of bytes. – Andreas Nov 07 '19 at 08:48
  • @JArgente If you want a stream of bytes instead of stream of array of bytes, you'd read one byte at a time and send it down the stream. One of the points of Streams is to process each element separately, so as not to use too much memory. Reading the entire file into memory is not the way to do that. – Andreas Nov 07 '19 at 08:48
  • Adding to what @Kayaman had pointed out, you can also look into redirecting the `InputStream` to an `OutputStream` using [`transferTo`](https://docs.oracle.com/javase/9/docs/api/java/io/InputStream.html#transferTo-java.io.OutputStream-) introduced in Java-9. – Naman Nov 07 '19 at 09:17

1 Answers1

2

You need to write your own Spliterator, something like this:

public final class ChunkingInputStreamSpliterator implements Spliterator<byte[]> {

    private final InputStream is;
    private final int bufferSize;

    public ChunkingInputStreamSpliterator(InputStream is, int bufferSize) {
        this.is = is;
        this.bufferSize = bufferSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super byte[]> action) {
        byte[] bytes;
        try {
            bytes = this.is.readNBytes(this.bufferSize);
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        if (bytes.length == 0)
            return false;
        action.accept(bytes);
        return true;
    }

    @Override
    public Spliterator<byte[]> trySplit() {
        return null; // cannot split an InputStream
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE; // unknown
    }

    @Override
    public int characteristics() {
        return Spliterator.ORDERED | Spliterator.NONNULL;
    }

}

Then implement your method like this:

public static Stream<byte[]> toStream(InputStream is, int bufferSize) {
    return StreamSupport.stream(new ChunkingInputStreamSpliterator(is, bufferSize), false);
}

If you don't have Java 11, so you don't have the very convenient readNBytes method, then do that part yourself like this:

public boolean tryAdvance(Consumer<? super byte[]> action) {
    byte[] bytes = new byte[this.bufferSize];
    int len = 0;
    try {
        for (int read; len < bytes.length; len += read)
            if ((read = this.is.read(bytes, len, bytes.length - len)) <= 0)
                break;
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    if (len == 0)
        return false;
    if (len < bytes.length)
        bytes = Arrays.copyOfRange(bytes, 0, len);
    action.accept(bytes);
    return true;
}
Andreas
  • 154,647
  • 11
  • 152
  • 247
  • Would it be wise to replace the `Long.MAX_VALUE` by `this.is.available()/this.bufferSize` in the `estimatedSize` method of the Spliterator? – thisisshantzz Nov 07 '19 at 09:04
  • 2
    @thisisshantzz if you know the size, e.g. because it’s a file, you should provide the size upfront (though, in that case, a `FileChannel` would be way more efficient). Otherwise, `InputStream.available()` does not provide the correct size. It only tells how much can be read *without blocking*. – Holger Nov 07 '19 at 09:30