1

I'm launching a process using ProcessBuilder like so:

val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()

I'd like to do 2 things with the stdout of the process:

  1. Continually monitor its most recent line of output
  2. Log all lines to a file

As far as I can tell, in order to do both of these things I'll need to "split" the InputStream I get from proc.inputStream so that every line is mirrored to 2 other InputStreams: one that can be used to log to a file, and another to parse and monitor the status of the process.

One option would be to have a thread which reads from the InputStream fires an event with each line read to "subscribers", and I think this should work fine, but I was hoping to come up with a more generic "Tee" type functionality that would expose InputStreams to be consumed by whatever wanted to. Basically something like this:

val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()
val originalInputStream = proc.inputStream

val tee = Tee(originalInputStream)
// Every line read from originalInputStream would be 
// mirrored to all branches (not necessarily every line 
// from the beginning of the originalInputStream, but 
// since the start of the lifetime of the created branch)
val branchOne: InputStream = tee.addBranch()
val branchTwo: InputStream = tee.addBranch()

I took a shot at a Tee class, but I'm not sure what to do in the addBranch method:

class Tee(inputStream: InputStream) {
    val reader = BufferedReader(InputStreamReader(inputStream))
    val branches = mutableListOf<OutputStream>()

    fun readLine() {
        val line = reader.readLine()
        branches.forEach {
            it.write(line.toByteArray())
        }
    }

    fun addBranch(): InputStream {
        // What to do here?  Need to create an OutputStream
        // which readLine can write to, but return an InputStream
        // which will be updated with each future write to that
        // OutputStream
    }
}

EDIT: The implementation of Tee I ended up with was as follows:

/**
 * Reads from the given [InputStream] and mirrors the read
 * data to all of the created 'branches' off of it.
 * All branches will 'receive' all data from the original
 * [InputStream] starting at the the point of
 * the branch's creation.
 * NOTE: This class will not read from the given [InputStream]
 * automatically, its [read] must be invoked
 * to read the data from the original stream and write it to
 * the branches
 */
class Tee(inputStream: InputStream) {
    val reader = BufferedReader(InputStreamReader(inputStream))
    var branches = CopyOnWriteArrayList<OutputStream>()

    fun read() {
        val c = reader.read()

        branches.forEach {
            // Recreate the carriage return so that readLine on the
            // branched InputStreams works
            it.write(c)
        }
    }

    fun addBranch(): InputStream {
        val outputStream = PipedOutputStream()
        branches.add(outputStream)
        return PipedInputStream(outputStream)
    }
}
bbaldino
  • 394
  • 3
  • 15

2 Answers2

2

Take a look at the org.apache.commons.io.output.TeeInputStream from Apache Commons then you don't need to bother writing your own.

val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()
val original = proc.inputStream

val out = new PipedOutputStream()
val in = new PipedInputStream()
out.connect(in)

val tee = new TeeInputStream(in, out)

Then just read from tee instead of original, and any bytes read will be also written to out. By using the Piped streams, the data written to out will be made available to be read via in and so now you can have two threads reading from in and tee independently. One thread writing to logs, and one thread monitoring lines.

sksamuel
  • 16,154
  • 8
  • 60
  • 108
  • I did look at the TeeInputStream from commons but actually thought it didn't meet my use case as it only appeared to be a single proxy for the original stream, but I hadn't realized the intention was to continue reading from the original one as well, so that's good to know! But that takes in a pre-created OutputStream, and I wasn't sure how to create that to eventually get another InputStream. I looked at PipedInput/OutputStream, but my understanding was `out.connect(in)` only gives you the direction of out -> in, not in -> out – bbaldino Feb 27 '18 at 17:05
  • 1
    I looked closer at PipedInputStream/PipedOutputStream and this path ended up getting me where I needed. Thanks @monkjack! – bbaldino Feb 27 '18 at 22:28
0

Looks like simple decorator will be enough for you:

class Tee(private vararg val branches: OutputStream) : OutputStream() {
    override fun write(b: Int) {
        for (branch in branches) {
            branch.write(b)
        }
    }

    override fun write(b: ByteArray?) {
        for (branch in branches) {
            branch.write(b)
        }
    }

    override fun write(b: ByteArray?, off: Int, len: Int) {
        for (branch in branches) {
            branch.write(b,off, len)
        }
    }

    override fun flush() {
        for (branch in branches) {
            branch.flush()
        }
    }

    override fun close() {
        for (branch in branches) {
            branch.close()
        }
    }
}

And then you can just copy your input stream to Tee, which, underneath, can do anything — write to file, parse input and so on.

If I understand correctly, you need to parse data line by line, so you can add one else implementation of output steam, which, in reality, will parse input data and do what you need.

Also, please take a look at this answer. Possibly it's what you need if you don't want to deal with multiple output streams.


Also I think you can combine both technics to gain even more power — write to several output streams and parse data at te same time, for example.

asm0dey
  • 2,841
  • 2
  • 20
  • 33
  • The answer you linked to is the type of solution I mentioned in the question...workable but I was hoping for something built around `InputStream`. I don't quite follow your response though, where's the decorator? And it takes in an OutputStream, but I'm not sure how to create an OutputStream to eventually get an InputStream to be used by other readers. – bbaldino Feb 27 '18 at 17:06
  • @bbaldino Tee decorates other output streams. And that's correct, if you want to parse input - you have to create decorator over input steam and create listener interface – asm0dey Feb 27 '18 at 17:13
  • Ok, I see what you mean by decorator. I know how I can achieve this with the listener interface, but I wanted to know if there was a way such that the consumer can operate on it as an `InputStream` type, just like it would the original stream (as opposed to the listener interface). – bbaldino Feb 27 '18 at 17:32
  • @bbaldino won't it be easier for you to just open to simulationeuos input streams (i.e. in threads) and do all the dirty work there? Then just join threads? And of course it's not possible OOTB — what is read from stream can't be read again :) There is always one more option — use [Channels](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/Channels.html#newChannel(java.io.InputStream)). Read portion into ByteBuffer, do what you want with it and read more. It's almost perfect because almost zero-overhead, but will need some custom code. – asm0dey Feb 27 '18 at 18:16
  • @asmodey unfortunately i don't think i can...the InputStream is pre-created (it comes from a `Process` instance). I took a look at Channels but haven't figured out a way to get what I need yet. Part of the problem is that since the InputStream is "infinite", anything based on a byte array has some issues since, from what I've seen, the readers coming from a byte-array make a copy and expect all the data to already be there. – bbaldino Feb 27 '18 at 21:21
  • @bbaldino no, they don't need full copy of data. All these pretty little things with channels work inside high performance servers. So no, you can get done data and write it where you want. And you can reuse bytebuffer infinitely – asm0dey Feb 27 '18 at 21:25