8

I've written background InputStream (and OutputStream) implementations that wrap other streams, and read ahead on a background thread, primarily allowing for decompression/compression to happen in different threads from the processing of the decompressed stream.

It's a fairly standard producer/consumer model.

This seems like an easy way to make good use of multi-core CPUs with simple processes that read, process, and write data, allowing for more efficient use of both CPU and disk resources. Perhaps 'efficient' isn't the best word, but it provides higher utilisation, and of more interest to me, reduced runtimes, compared to reading directly from a ZipInputStream and writing directly to a ZipOutputStream.

I'm happy to post the code, but my question is whether I'm reinventing something readily available in existing (and more heavily exercised) libraries?

Edit - posting code...

My code for the BackgroundInputStream is below (the BackgroundOutputStream is very similar), but there are aspects of it that I'd like to improve.

  1. It looks like I'm working far too hard to pass buffers back and forward.
  2. If the calling code throws away references to the BackgroundInputStream, the backgroundReaderThread will hang around forever.
  3. Signalling eof needs improving.
  4. Exceptions should be propagated to the foreground thread.
  5. I'd like to allow using a thread from a provided Executor.
  6. The close() method should signal the background thread, and shouldn't close the wrapped stream, as the wrapped stream should be owned by the background thread that reads from it.
  7. Doing silly things like reading after closing should be catered for appropriately.

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream {
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue<byte[]> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte[] currentBuffer;
    private volatile byte[] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream) {
        this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
    }

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
        this.wrappedInputStream = wrappedInputStream;
        this.queueSize = queueSize;
        this.bufferSize = bufferSize;
    }

    @Override
    public int read() throws IOException {
        if (bufferQueue == null) {
            bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            BackgroundReader backgroundReader = new BackgroundReader();
            Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
            backgroundReaderThread.start();
        }
        if (currentBuffer == null) {
            try {
                if ((!eof) || (bufferQueue.size() > 0)) {
                    currentBuffer = bufferQueue.take();
                    pos = 0;
                } else {
                    return -1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int b = currentBuffer[pos++];
        if (pos == currentBuffer.length) {
            freeBuffer = currentBuffer;
            currentBuffer = null;
        }
        return b;
    }

    @Override
    public int available() throws IOException {
        if (currentBuffer == null) return 0;
        return currentBuffer.length;
    }

    @Override
    public void close() throws IOException {
        wrappedInputStream.close();
        currentBuffer = null;
        freeBuffer = null;
    }

    class BackgroundReader implements Runnable {

        @Override
        public void run() {
            try {
                while (!eof) {
                    byte[] newBuffer;
                    if (freeBuffer != null) {
                        newBuffer = freeBuffer;
                        freeBuffer = null;
                    } else {
                        newBuffer = new byte[bufferSize];
                    }
                    int bytesRead = 0;
                    int writtenToBuffer = 0;
                    while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
                        writtenToBuffer += bytesRead;
                    }
                    if (writtenToBuffer > 0) {
                        if (writtenToBuffer < bufferSize) {
                            newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                        }
                        bufferQueue.put(newBuffer);
                    }
                    if (bytesRead == -1) {
                        eof = true;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
Stephen Denne
  • 36,219
  • 10
  • 45
  • 60
  • Did you found your answer? Is this available in any of the exisiting libraries? – adi Oct 17 '13 at 09:39
  • @adi I think the simplicity of single threaded solutions means it is safest to make use of multiple cores by processing multiple files at once, each in a single thread, rather than using multiple threads to speed the processing of a single file. If you've only got one file to process, you can probably bear to wait the extra time required to process it in a single thread. – Stephen Denne Oct 21 '13 at 06:18

2 Answers2

3

Sounds interesting. I've never run across anything that does this out of the box but it makes perfect sense to try and use an idle core for the compression if it's available.

Perhaps you could make use of Commons I/O - it is a well tested lib which could help handle some of the more boring stuff and let you focus on extending the cool parallel parts. Maybe you could even contribute your code to the Commons project ;-)

jckdnk111
  • 2,280
  • 5
  • 33
  • 43
0

I'd be interested. I've thought through a similar project, but couldn't figure out how to handle pieces that finish compression out of order.

BobMcGee
  • 19,824
  • 10
  • 45
  • 57
  • I'm not breaking up the decompression or compression processes, I'm simply performing them on additional single threads. – Stephen Denne Feb 02 '10 at 09:55