59

I have a method that takes an InputStream and reads data from it. I would like to use this method with a ByteBuffer also. Is there a way to wrap a ByteBuffer so it can be accessed as a stream?

Erik
  • 743
  • 1
  • 5
  • 11
  • Is it a native ByteBuffer, or backed by a byte array? – EboMike Dec 02 '10 at 06:24
  • Backed by a byte array in this case – Erik Dec 02 '10 at 06:30
  • 3
    I found that Jackson has it: [Jackson ByteBufferBackedInputStream](https://fasterxml.github.io/jackson-databind/javadoc/2.8/com/fasterxml/jackson/databind/util/ByteBufferBackedInputStream.html#ByteBufferBackedInputStream(java.nio.ByteBuffer)) com.fasterxml.jackson.databind.util – Geoffrey Hendrey Aug 31 '17 at 04:53

7 Answers7

86

There seem to be some bugs with the implementation referred to by Thilo, and also copy and pasted on other sites verbatim:

  1. ByteBufferBackedInputStream.read() returns a sign extended int representation of the byte it reads, which is wrong (value should be in range [-1..255])
  2. ByteBufferBackedInputStream.read(byte[], int, int) does not return -1 when there are no bytes remaining in the buffer, as per the API spec

ByteBufferBackedOutputStream seems relatively sound.

I present a 'fixed' version below. If I find more bugs (or someone points them out) I'll update it here.

Updated: removed synchronized keywords from read/write methods

InputStream

public class ByteBufferBackedInputStream extends InputStream {

    ByteBuffer buf;

    public ByteBufferBackedInputStream(ByteBuffer buf) {
        this.buf = buf;
    }

    public int read() throws IOException {
        if (!buf.hasRemaining()) {
            return -1;
        }
        return buf.get() & 0xFF;
    }

    public int read(byte[] bytes, int off, int len)
            throws IOException {
        if (!buf.hasRemaining()) {
            return -1;
        }

        len = Math.min(len, buf.remaining());
        buf.get(bytes, off, len);
        return len;
    }
}

OutputStream

public class ByteBufferBackedOutputStream extends OutputStream {
    ByteBuffer buf;

    public ByteBufferBackedOutputStream(ByteBuffer buf) {
        this.buf = buf;
    }

    public void write(int b) throws IOException {
        buf.put((byte) b);
    }

    public void write(byte[] bytes, int off, int len)
            throws IOException {
        buf.put(bytes, off, len);
    }

}
Kothar
  • 6,579
  • 3
  • 33
  • 42
  • Thank you @Mike for a code. I have a suggestion. I think it is better to add `@Override public void flush() throws IOException { buf.flip(); }` method to ByteBufferBackedOutputStream code. – denys May 08 '12 at 12:49
  • 1
    Why would you make it synchronized? Do you expect multiple threads to read the same inputstream? – Nim May 23 '13 at 13:46
  • That's a very good point, more copy & paste laziness there. I will edit to remove, as it's not part of the InputStream/OutputStream API contract. – Kothar May 29 '13 at 09:03
  • 1
    @denys, sorry, only just noticed your comment - why do you want the `flush` to have that effect? It seems like a `flip` would be confusing, since it would overwrite earlier data, which isn't what `flush()` normally does. I assume you are trying to use a single buffer wrapped in both a input and output stream as a buffer? – Kothar May 29 '13 at 09:14
  • You are right @MikeHouston it is better to name it `flip()`. Yes I'm using the same buffer for consumer and reader. – denys May 29 '13 at 11:09
  • `InputStream` and `OutputStream` only have one abstract method each. Why override two? – jaco0646 Mar 30 '17 at 23:16
  • 1
    @jaco0646 While it's true you only _need_ to implement the single abstract method, the default implementation of the other method is implemented in terms of read(int) and write(int) so it contains a loop: `for (int i = 0 ; i < len ; i++) { write(b[off + i]); }` For more efficiency, we can pass the byte array through to the the buffer directly and avoid converting to/from int values and making one function call for each byte. – Kothar Apr 05 '17 at 14:01
  • 1
    maybe you can remove `throws IOException` from method signature because the actual implementation never throws these exceptions. – Robert Nov 05 '17 at 12:25
  • 1
    Depending on how you allocated your `ByteBuffer`, you may want to override `InputStream#close()` to free or release the `ByteBuffer` too. – Luke Hutchison Jun 14 '18 at 01:45
  • 2
    Should've implemented `int available()` too. – Andreas is moving to Codidact Jun 16 '20 at 18:54
16

Nothing in the JDK, but there are lots of implementations out there, google for ByteBufferInputStream. Basically they wrap one or more ByteBuffers and keep track of an index into them that records how much has already been read. Something like this comes up a lot, but apparently is buggy, see @Mike Houston's answer for an improved version).

Community
  • 1
  • 1
Thilo
  • 257,207
  • 101
  • 511
  • 656
7

If it's backed by a byte array, you can use a ByteArrayInputStream and get the byte array via ByteBuffer.array(). This will throw an exception if you're trying it on a native ByteBuffer.

EboMike
  • 76,846
  • 14
  • 164
  • 167
  • By "native ByteBuffer" do you mean a ByteBuffer object that was created via ByteBuffer.allocateDirect()? – BD at Rivenhill Feb 09 '12 at 06:50
  • 8
    This approach only works if you're sure you want to read the entire contents of the backing byte array. For cases where you have a partially-full buffer, you'll end up reading beyond the limit. – stevevls Nov 15 '12 at 20:24
  • 2
    This approach is wrong, because buffer contents may be only a part of the array, and the array will contain other data in the beginning and end. See get() method implementation. – Dmitry Risenberg Feb 25 '15 at 12:37
3

Use the heap buffer (byte array) directly if available, otherwise use wrapped bytebuffer (see answer Mike Houston)

public static InputStream asInputStream(ByteBuffer buffer) {
    if (buffer.hasArray()) {
        // use heap buffer; no array is created; only the reference is used
        return new ByteArrayInputStream(buffer.array());
    }
    return new ByteBufferInputStream(buffer);
}

Also note that the wrapped buffer can efficiently support the mark/reset and skip operations.

rmuller
  • 12,062
  • 4
  • 64
  • 92
  • 1
    Note that `.array()` is an optional operation. It may be unimplemented (e.g. `MappedByteBuffer`), and throws Exception for read only buffers even if implemented. – Sheepy Dec 18 '14 at 03:00
  • 1
    Indeed, that is why the `buffer.hasArray()` is there :) – rmuller Dec 18 '14 at 06:36
  • 1
    This will be ok if you always want your `InputStream` to be based on the whole array, but will not provide the desired results for streams with an offset. Same issues as [this answer](http://stackoverflow.com/a/4332427/836214) that was provided 4 years before yours... – Krease Apr 28 '15 at 20:45
  • @Chris First, OP did not ask for support of streams with an offset. Second, my answer was meant as an addition to the answer of Mike Houston (which is clearly stated in the text) – rmuller Apr 30 '15 at 08:10
  • OP asked for wrapping a `ByteBuffer` to access as a stream. `ByteBuffer` uses offsets to control which section of the underlying array is normally accessible to callers. That's part of the point of using `ByteBuffer` instead of just `byte[]` in the first place. – Krease Apr 30 '15 at 21:08
3

This is my version of InputStream & OutputStream implementation:

ByteBufferBackedInputStream:

public class ByteBufferBackedInputStream extends InputStream
{
  private ByteBuffer backendBuffer;

  public ByteBufferBackedInputStream(ByteBuffer backendBuffer) {
      Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
      this.backendBuffer = backendBuffer;
  }

  public void close() throws IOException {
      this.backendBuffer = null;
  }

  private void ensureStreamAvailable() throws IOException {
      if (this.backendBuffer == null) {
          throw new IOException("read on a closed InputStream!");
      }
  }

  @Override
  public int read() throws IOException {
      this.ensureStreamAvailable();
      return this.backendBuffer.hasRemaining() ? this.backendBuffer.get() & 0xFF : -1;
  }

  @Override
  public int read(@Nonnull byte[] buffer) throws IOException {
      return this.read(buffer, 0, buffer.length);
  }

  @Override
  public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException {
      this.ensureStreamAvailable();
      Objects.requireNonNull(buffer, "Given buffer can not be null!");
      if (offset >= 0 && length >= 0 && length <= buffer.length - offset) {
          if (length == 0) {
              return 0;
          }
          else {
              int remainingSize = Math.min(this.backendBuffer.remaining(), length);
              if (remainingSize == 0) {
                  return -1;
              }
              else {
                  this.backendBuffer.get(buffer, offset, remainingSize);
                  return remainingSize;
              }
          }
      }
      else {
          throw new IndexOutOfBoundsException();
      }
  }

  public long skip(long n) throws IOException {
      this.ensureStreamAvailable();
      if (n <= 0L) {
          return 0L;
      }
      int length = (int) n;
      int remainingSize = Math.min(this.backendBuffer.remaining(), length);
      this.backendBuffer.position(this.backendBuffer.position() + remainingSize);
      return (long) length;
  }

  public int available() throws IOException {
      this.ensureStreamAvailable();
      return this.backendBuffer.remaining();
  }

  public synchronized void mark(int var1) {
  }

  public synchronized void reset() throws IOException {
      throw new IOException("mark/reset not supported");
  }

  public boolean markSupported() {
      return false;
  }
}

ByteBufferBackedOutputStream:

public class ByteBufferBackedOutputStream extends OutputStream
{
    private ByteBuffer backendBuffer;

    public ByteBufferBackedOutputStream(ByteBuffer backendBuffer) {
        Objects.requireNonNull(backendBuffer, "Given backend buffer can not be null!");
        this.backendBuffer = backendBuffer;
    }

    public void close() throws IOException {
        this.backendBuffer = null;
    }

    private void ensureStreamAvailable() throws IOException {
        if (this.backendBuffer == null) {
            throw new IOException("write on a closed OutputStream");
        }
    }

    @Override
    public void write(int b) throws IOException {
        this.ensureStreamAvailable();
        backendBuffer.put((byte) b);
    }

    @Override
    public void write(@Nonnull byte[] bytes) throws IOException {
        this.write(bytes, 0, bytes.length);
    }

    @Override
    public void write(@Nonnull byte[] bytes, int off, int len) throws IOException {
        this.ensureStreamAvailable();
        Objects.requireNonNull(bytes, "Given buffer can not be null!");
        if ((off < 0) || (off > bytes.length) || (len < 0) ||
            ((off + len) > bytes.length) || ((off + len) < 0))
        {
            throw new IndexOutOfBoundsException();
        }
        else if (len == 0) {
            return;
        }

        backendBuffer.put(bytes, off, len);
    }
}
bob
  • 1,107
  • 10
  • 16
2

Based on a derivative of ByteArrayInputStream code ... Requires the supplied ByteBuffer to have position and limit correctly set in advance as appropriate.

    public class ByteBufferInputStream extends InputStream
    {
        /**
         * The input ByteBuffer that was provided.
         * The ByteBuffer should be supplied with position and limit correctly set as appropriate
         */
        protected ByteBuffer buf;

        public ByteBufferInputStream(ByteBuffer buf)
        {
            this.buf = buf;
            buf.mark(); // to prevent java.nio.InvalidMarkException on InputStream.reset() if mark had not been set
        }

        /**
         * Reads the next byte of data from this ByteBuffer. The value byte is returned as an int in the range 0-255.
         * If no byte is available because the end of the buffer has been reached, the value -1 is returned.
         * @return  the next byte of data, or -1 if the limit/end of the buffer has been reached.
         */
        public int read()
        {
            return buf.hasRemaining()
                ? (buf.get() & 0xff)
                : -1;
        }

        /**
         * Reads up to len bytes of data into an array of bytes from this ByteBuffer.
         * If the buffer has no remaining bytes, then -1 is returned to indicate end of file.
         * Otherwise, the number k of bytes read is equal to the smaller of len and buffer remaining.
         * @param   b     the buffer into which the data is read.
         * @param   off   the start offset in the destination array b
         * @param   len   the maximum number of bytes read.
         * @return  the total number of bytes read into the buffer, or -1 if there is no more data because the limit/end of
         *          the ByteBuffer has been reached.
         * @exception  NullPointerException If b is null.
         * @exception  IndexOutOfBoundsException If off is negative, len is negative, or len is greater than b.length - off
         */
        public int read(byte b[], int off, int len)
        {
            if (b == null)
            {
                throw new NullPointerException();
            }
            else if (off < 0 || len < 0 || len > b.length - off)
            {
                throw new IndexOutOfBoundsException();
            }

            if (!buf.hasRemaining())
            {
                return -1;
            }

            int remaining = buf.remaining();
            if (len > remaining)
            {
                len = remaining;
            }

            if (len <= 0)
            {
                return 0;
            }

            buf.get(b, off, len);

            return len;
        }

        /**
         * Skips n bytes of input from this ByteBuffer. Fewer bytes might be skipped if the limit is reached.
         *
         * @param   n   the number of bytes to be skipped.
         * @return  the actual number of bytes skipped.
         */
        public long skip(long n)
        {
            int skipAmount = (n < 0)
                ? 0
                : ((n > Integer.MAX_VALUE)
                ? Integer.MAX_VALUE
                : (int) n);

            if (skipAmount > buf.remaining())
            {
                skipAmount = buf.remaining();
            }

            int newPos = buf.position() + skipAmount;

            buf.position(newPos);

            return skipAmount;
        }

        /**
         * Returns remaining bytes available in this ByteBuffer
         * @return the number of remaining bytes that can be read (or skipped over) from this ByteBuffer.
         */
        public int available()
        {
            return buf.remaining();
        }

        public boolean markSupported()
        {
            return true;
        }

        /**
         * Set the current marked position in the ByteBuffer.
         * <p> Note: The readAheadLimit for this class has no meaning.
         */
        public void mark(int readAheadLimit)
        {
            buf.mark();
        }

        /**
         * Resets the ByteBuffer to the marked position.
         */
        public void reset()
        {
            buf.reset();
        }

        /**
         * Closing a ByteBuffer has no effect.
         * The methods in this class can be called after the stream has been closed without generating an IOException.
         */
        public void close() throws IOException
        {
        }
    }
0

After copy-pasting ByteBufferBackedInputStream and wanting to use it, IDE hints me that Jackson already has one. So I paste it here as a reference:

package com.fasterxml.jackson.databind.util;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

/**
 * Simple {@link InputStream} implementation that exposes currently
 * available content of a {@link ByteBuffer}.
 */
public class ByteBufferBackedInputStream extends InputStream {
    protected final ByteBuffer _b;

    public ByteBufferBackedInputStream(ByteBuffer buf) { _b = buf; }

    @Override public int available() { return _b.remaining(); }
    
    @Override
    public int read() throws IOException { return _b.hasRemaining() ? (_b.get() & 0xFF) : -1; }

    @Override
    public int read(byte[] bytes, int off, int len) throws IOException {
        if (!_b.hasRemaining()) return -1;
        len = Math.min(len, _b.remaining());
        _b.get(bytes, off, len);
        return len;
    }
}
ch271828n
  • 15,854
  • 5
  • 53
  • 88