15

Premise

I'm trying to find the correct way to prematurely terminate a series of piped streams (pipeline) in Node.js: sometimes I want to gracefully abort the stream before it has finished. Specifically I'm dealing with mostly objectMode: true and non-native parallel streams, but this shouldn't really matter.

Problem

The problem is when I unpipe the pipeline, data remains in each stream's buffer and is drained. This might be okay for most of the intermediate streams (e.g. Readable/Transform), but the last Writable still drains to its write target (e.g. a file or a database or socket or w/e). This could be problematic if the buffer contains hundreds or thousands of chunks which takes a significant amount of time to drain. I want it to stop immediately, i.e. not drain; why waste cycles and memory on data that doesn't matter?

Depending on the route I go, I receive either a "write after end" error, or an exception when the stream cannot find existing pipes.

Question

What is the proper way to gracefully kill off a pipeline of streams in the form a.pipe(b).pipe(c).pipe(z)?

Solution?

The solution I have come up with is 3-step:

  1. unpipe each stream in the pipeline in reverse order
  2. Empty each stream's buffer that implements Writable
  3. end each stream that implements Writable

Some pseudo code illustrating the entire process:

var pipeline = [ // define the pipeline
  readStream,
  transformStream0,
  transformStream1,
  writeStream
];

// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
    if ( !tmpBuildStream ) {
        tmpBuildStream = stream;
        continue;
    }
    tmpBuildStream = lastStream.pipe(stream);
});

// sleep, timeout, event, etc...

// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
    if ( !tmpTearStream ) {
        tmpTearStream = stream;
        continue;
    }
    tmpTearStream = stream.unpipe(tmpTearStream);
});

// empty and end the pipeline
pipeline.forEach(function(stream) {
  if ( typeof stream._writableState === 'object' ) { // empty
    stream._writableState.length -= stream._writableState.buffer.length;
    stream._writableState.buffer = [];
  }
  if ( typeof stream.end === 'function' ) { // kill
    stream.end();
  }
});

I'm really worried about the usage of stream._writableState and modifying the internal buffer and length properties (the _ signifies a private property). This seems like a hack. Also note that since I'm piping, things like pause and resume our out of the question (based on a suggestion I received from IRC).

I also put together a runnable version (pretty sloppy) you can grab from github: https://github.com/zamnuts/multipipe-proto (git clone, npm install, view readme, npm start)

zamnuts
  • 9,492
  • 3
  • 39
  • 46
  • I'm also interesting in how to preempt and halt a huge (multi gigabyte) stream for efficiency. (e.g. you just want to read the headers) – user949300 Mar 11 '15 at 21:24
  • 2
    As far as I know there is no official solution to clear a write stream. The only solution I can think of would be to write a custom transform stream which you could insert just before the write stream in the pipeline. This stream would implement it's own buffering behavior, taking over that responsibility from the write stream. Because we own the buffering mechanism of this stream, we can build a method to clear it without resorting to hacks. The write stream should then get a very low highWaterMark, to minimize the data to write when we terminate. – Jasper Woudenberg Mar 17 '15 at 19:47
  • @JasperWoudenberg I think you're on to something there. Also, since writing this question, IIRC there have been releases that have fixed this buffering quirk. – zamnuts Mar 18 '15 at 18:08

1 Answers1

2

In this particular case I think we should get rid of the structure where you have 4 different not fully customised streams. Piping them together will create chain dependency that will be hard to control if we haven't implement our own mechanism.

I would like to focus on your actuall goal here:

 INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT
               |          |              |            |
 KILL_ALL------o----------o--------------o------------o--------[nothing to drain]

I believe that the above structure can be achieved via combining custom:

  1. duplex stream - for own _write(chunk, encoding, cb)and _read(bytes) implementation with

  2. transform stream - for own _transform(chunk, encoding, cb) implementation.

Since you are using the writable-stream-parallel package you may also want to go over their libs, as their duplex implementation can be found here: https://github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js . And their transform stream implementation is here: https://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js. Here they handle the highWaterMark.

Possible solution

Their write stream : https://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189 has an interesting function writeOrBuffer, I think you might be able to tweak it a bit to interrupt writing the data from buffer.

Note: These 3 flags are controlling the buffer clearing:

( !finished && !state.bufferProcessing && state.buffer.length )

References:

Piotr Dajlido
  • 1,982
  • 15
  • 28
  • 1) Piping the streams together allows me to have a pluggable architecture - the pipeline can be created, organized, and differ in length depending on user-definable filters/transforms/etc. 2) you're not very clear on the "custom combination" part, can you elaborate or reword? 3) The possible solution you mention is no better nor worse than my suggested solution - I still have to patch code which I do not maintain. – zamnuts Mar 18 '15 at 18:06
  • Well I haven't implemented this solution yet, but I wanted to point out that you can take advantage of creating your own streams which gives you good control of what is happening "under the hood", instead of hacking the already existing solution. This is a very exciting topic, as I went over the `writable-stream-parallel` docs I could see more and more places for control implementation. And about the piping : I agree that pluggable arch. is great but in your case I would advice to build your own "plugs". – Piotr Dajlido Mar 18 '15 at 18:25