197

What's the correct way to handle errors with streams? I already know there's an 'error' event you can listen on, but I want to know some more details about arbitrarily complicated situations.

For starters, what do you do when you want to do a simple pipe chain:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

And how do you properly create one of those transforms so that errors are handled correctly?

More related questions:

  • when an error happens, what happens to the 'end' event? Does it never get fired? Does it sometimes get fired? Does it depend on the transform/stream? What are the standards here?
  • are there any mechanisms for propogating errors through the pipes?
  • do domains solve this problem effectively? Examples would be nice.
  • do errors that come out of 'error' events have stack traces? Sometimes? Never? is there a way to get one from them?
B T
  • 57,525
  • 34
  • 189
  • 207

9 Answers9

262

transform

Transform streams are both readable and writeable, and thus are really good 'middle' streams. For this reason, they are sometimes referred to as through streams. They are similar to a duplex stream in this way, except they provide a nice interface to manipulate the data rather than just sending it through. The purpose of a transform stream is to manipulate the data as it is piped through the stream. You may want to do some async calls, for example, or derive a couple of fields, remap some things, etc.


Where you might put a transform stream


For how to create a transform stream see here and here. All you have to do is :

  1. include the stream module
  2. instantiate ( or inherit from) the Transform class
  3. implement a _transform method which takes a (chunk, encoding, callback).

The chunk is your data. Most of the time you won't need to worry about encoding if you are working in objectMode = true. The callback is called when you are done processing the chunk. This chunk is then pushed on to the next stream.

If you want a nice helper module that will enable you to do through stream really really easily, I suggest through2.

For error handling, keep reading.

pipe

In a pipe chain, handling errors is indeed non-trivial. According to this thread .pipe() is not built to forward errors. So something like ...

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

... would only listen for errors on the stream c. If an error event was emitted on a, that would not be passed down and, in fact, would throw. To do this correctly:

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

Now, though the second way is more verbose, you can at least keep the context of where your errors happen. This is usually a good thing.

One library I find helpful though if you have a case where you only want to capture the errors at the destination and you don't care so much about where it happened is event-stream.

end

When an error event is fired, the end event will not be fired (explicitly). The emitting of an error event will end the stream.

domains

In my experience, domains work really well most of the time. If you have an unhandled error event (i.e. emitting an error on a stream without a listener), the server can crash. Now, as the above article points out, you can wrap the stream in a domain which should properly catch all errors.

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });

The beauty of domains is that they will preserve the stack traces. Though event-stream does a good job of this as well.

For further reading, check out the stream-handbook1. Pretty in depth, but super useful and gives some great links to lots of helpful modules.

1: Note: this link points to archive.org as the original GitHub repo was deleted around August, 2022.

JDB
  • 25,172
  • 5
  • 72
  • 123
mshell_lauren
  • 5,171
  • 4
  • 28
  • 36
  • That's really great info, thanks! Could you add a little bit about why you'd want to create a transform stream and why it relates to my question? – B T Mar 14 '14 at 00:55
  • Sure - though I figured it related since you asked about it ; ) – mshell_lauren Mar 14 '14 at 18:22
  • 1
    Post on this by isaccs on Google Groups- nodejs: https://groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ (not grokbase) – jpillora May 08 '14 at 08:44
  • This answer is written perfectly. I'm going to investigate the domain suggestion -- it appears to be the kind of solution I was looking for. – Semicolon Jan 25 '15 at 02:54
  • 15
    Note that you do not need to wrap the `.on('error')` handler in an anonymous function i.e. `a.on('error', function(e){handleError(e)})` can just be `a.on('error', handleError)` – timoxley Sep 24 '15 at 09:37
  • "The emitting of an error event will end the stream" So, does it mean that I can't really recover from a streaming error? – Vanuan May 01 '16 at 11:06
  • This isn't true (anymore!?): _When an error event is fired, the end event will not be fired (explicitly). The emitting of an error event will end the stream._ ... see https://www.bennadel.com/blog/2678-error-events-don-t-inherently-stop-streams-in-node-js.htm. Relying on posts like that I had to explicitly `stream.end()` my passthrough stream to end it after emitting some `error` event. And due to that I got an `end` event, too. – Thomas Urban Mar 04 '18 at 10:31
  • 3
    The `domain` module is being *deprecated*: https://nodejs.org/api/domain.html – famzah Sep 01 '21 at 05:08
45

If you are using node >= v10.0.0 you can use stream.pipeline and stream.finished.

For example:

const { pipeline, finished } = require('stream');

pipeline(
  input, 
  transformA, 
  transformB, 
  transformC, 
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

See this github PR for more discussion.

shusson
  • 5,542
  • 34
  • 38
26

domains are deprecated. you dont need them.

for this question, distinctions between transform or writable are not so important.

mshell_lauren's answer is great, but as an alternative you can also explicitly listen for the error event on each stream you think might error. and reuse the handler function if you prefer.

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

doing so prevents the infamous uncaught exception should one of those stream fire its error event

Bent Cardan
  • 4,540
  • 2
  • 21
  • 17
  • 5
    lol have fun handling 3 different error events and pray that whoever wrote the 3 different streaming libs implemented error handling correctly – Alexander Mills Mar 25 '16 at 18:25
  • 6
    @Alex Mills 1) What is the problem of handling 3 events, and why are they "different", when their type is the same -- `error`, one may as well then settle with the fact that each event is distinct; 2) what streaming libs are written above, other than native Node.js functionality? and 3) why does it matter how they handle events internally, when this obviously allows anyone to attach additional error handlers on top of whatever is already there? – Armen Michaeli Apr 14 '16 at 19:37
  • But are the other streams also closed if there is an error in one stream? – fishbone May 17 '22 at 13:21
13

Errors from the whole chain can be propagated to the rightmost stream using a simple function:

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

which can be used like:

safePipe(readable, [ transform1, transform2, ... ]);
Gleba
  • 161
  • 1
  • 3
6

.on("error", handler) only takes care of Stream errors but if you are using custom Transform streams, .on("error", handler) don't catch the errors happening inside _transform function. So one can do something like this for controlling application flow :-

this keyword in _transform function refers to Stream itself, which is an EventEmitter. So you can use try catch like below to catch the errors and later on pass them to the custom event handlers.

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

This way, you can keep your logic and error handlers separate. Also , you can opt to handle only some errors and ignore others.

UPDATE
Alternative: RXJS Observable

Vikas Gautam
  • 1,793
  • 22
  • 21
5

Use multipipe package to combinate several streams into one duplex stream. And handle errors in one place.

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)
2

Use Node.js pattern by creating a Transform stream mechanics and calling its callback done with an argument in order to propagate the error:

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });
Derek
  • 3,295
  • 3
  • 24
  • 31
  • Hmm, so you're saying if all stream processors were built like this, errors would propagate? – B T Sep 28 '17 at 20:21
  • Thank you. for stream transformer this seems to be the correct way. The error gets propagated by the stram.pipeline which can be very easily wrapped in a promise. – AWS User Sep 04 '21 at 20:18
2
const http = require('http');
const fs = require('fs');
const server = http.createServer();

server.on('request',(req,res)=>{
    const readableStream = fs.createReadStream(__dirname+'/README.md');
    const writeableStream = fs.createWriteStream(__dirname+'/assets/test.txt');
    readableStream
    .on('error',()=>{
        res.end("File not found")
    })
    .pipe(writeableStream)
    .on('error',(error)=>{
        console.log(error)
        res.end("Something went to wrong!")
    })
    .on('finish',()=>{
        res.end("Done!")
    })
})

server.listen(8000,()=>{
    console.log("Server is running in 8000 port")
})
Soura Ghosh
  • 879
  • 1
  • 9
  • 16
  • I'm pretty sure this does not properly catch errors that happen when piping to writeableStream. – B T Nov 27 '21 at 19:50
  • @BT , I have edited my post accroding to your cooments and I believe now we can handle the error whenever we will get some error during piping to writeableStream. – Soura Ghosh Dec 04 '21 at 11:35
-1

Try catch won't capture the errors that occurred in the stream because as they are thrown after the calling code has already exited. you can refer to the documentation:

https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

Mehran
  • 100
  • 1
  • 6
  • Thanks, but this doesn't answer the question at all. – B T Apr 17 '19 at 21:53
  • Giving me a 40 page document is not helpful. What do you think I should refer to in that giant page? Also, have you read my question? My question is not "does try catch work with streams?" I'm already well aware that try-catch won't work with asynchronous errors, eg ones from stream processing pipelines. – B T Apr 22 '19 at 23:17