131

How to close a readable stream in Node.js?

var input = fs.createReadStream('lines.txt');

input.on('data', function(data) {
   // after closing the stream, this will not
   // be called again

   if (gotFirstLine) {
      // close this stream and continue the
      // instructions from this if
      console.log("Closed.");
   }
});

This would be better than:

input.on('data', function(data) {
   if (isEnded) { return; }

   if (gotFirstLine) {
      isEnded = true;
      console.log("Closed.");
   }
});

But this would not stop the reading process...

Yves M.
  • 29,855
  • 23
  • 108
  • 144
Ionică Bizău
  • 109,027
  • 88
  • 289
  • 474

10 Answers10

118

Edit: Good news! Starting with Node.js 8.0.0 readable.destroy is officially available: https://nodejs.org/api/stream.html#stream_readable_destroy_error

ReadStream.destroy

You can call the ReadStream.destroy function at any time.

var fs = require("fs");

var readStream = fs.createReadStream("lines.txt");
readStream
    .on("data", function (chunk) {
        console.log(chunk);
        readStream.destroy();
    })
    .on("end", function () {
        // This may not been called since we are destroying the stream
        // the first time "data" event is received
        console.log("All the data in the file has been read");
    })
    .on("close", function (err) {
        console.log("Stream has been destroyed and file has been closed");
    });

The public function ReadStream.destroy is not documented (Node.js v0.12.2) but you can have a look at the source code on GitHub (Oct 5, 2012 commit).

The destroy function internally mark the ReadStream instance as destroyed and calls the close function to release the file.

You can listen to the close event to know exactly when the file is closed. The end event will not fire unless the data is completely consumed.


Note that the destroy (and the close) functions are specific to fs.ReadStream. There are not part of the generic stream.readable "interface".

Yves M.
  • 29,855
  • 23
  • 108
  • 144
  • At least in the latest version of Node (haven't checked the others), the file descriptor is [closed automatically](https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options). That said, I haven't done any kind of thorough test to ensure that the stream eventually fires `error` if it is never read. Aside from that, the only other leak I'd worry about is event handlers-- once again, I'm not 100% sure on this, but we might be ok b/c the 2010 the gospel of Isaacs does say that handlers are pruned when emitters are gc'd: https://groups.google.com/d/msg/nodejs/pXbJVo0NtaY/BxUmF_jp9LkJ – mikermcneil Dec 12 '15 at 22:48
  • 1
    If data is too small, the `on('data')` will only trigger once, so there will not any `.close()`, just remind someone else. – Shuai Li Mar 02 '19 at 02:03
  • 1
    you can actually use `this.destroy()` unless you're using an arrow function. Lexical `this` I hate you :D – Moha the almighty camel Sep 10 '21 at 17:14
  • 6
    I'm using `pipe()` and I had to move `on("close")` to be before `.pipe()` an `on("data")`, otherwise I wasn't able to catch "close" event – Maxim Mazurok Mar 14 '22 at 12:09
  • @MaximMazurok you are a life saver brother, this had me spinning for hours, Thanks! – Fazal Karim Mar 23 '22 at 09:29
46

Invoke input.close(). It's not in the docs, but

https://github.com/joyent/node/blob/cfcb1de130867197cbc9c6012b7e84e08e53d032/lib/fs.js#L1597-L1620

clearly does the job :) It actually does something similar to your isEnded.

EDIT 2015-Apr-19 Based on comments below, and to clarify and update:

  • This suggestion is a hack, and is not documented.
  • Though for looking at the current lib/fs.js it still works >1.5yrs later.
  • I agree with the comment below about calling destroy() being preferable.
  • As correctly stated below this works for fs ReadStreams's, not on a generic Readable

As for a generic solution: it doesn't appear as if there is one, at least from my understanding of the documentation and from a quick look at _stream_readable.js.

My proposal would be put your readable stream in paused mode, at least preventing further processing in your upstream data source. Don't forget to unpipe() and remove all data event listeners so that pause() actually pauses, as mentioned in the docs

Yves M.
  • 29,855
  • 23
  • 108
  • 144
Nitzan Shaked
  • 13,460
  • 5
  • 45
  • 54
  • 1
    Actually I would prefer calling `destroy` instead. At least that's what is called if you set autoClose to true. By looking at the source code (today) the differences are minimal (`destroy` calls `close`) but that could change in the future – Marcelo Diniz Mar 10 '14 at 11:44
  • Don't remember by now, but looks like it :) – Nitzan Shaked Jun 03 '14 at 20:49
  • 4
    There is no `close()` on object Readable, is there a never solution? My data exchange is always incomplete... – CodeManX Nov 04 '14 at 16:15
  • Updated to clarify, address comments, and provide a (poor man's) suggestion for the generic case. Though it does make some sense to not force a generic `readable` to implement `close()`, and provide a class-specific way of doing this (as is the case in `fs`, and presumably other classes implementing `Readable`) – Nitzan Shaked Apr 19 '15 at 19:06
  • Won't pausing cause the upstream (sender) to block due to backpressure, or otherwise cause buffers to grow until they exceed their limits? Ideally we would tell the sender that it is no longer wanted... – joeytwiddle Sep 27 '16 at 02:32
27

Today, in Node 10

readableStream.destroy()

is the official way to close a readable stream

see https://nodejs.org/api/stream.html#stream_readable_destroy_error

DevTheJo
  • 2,179
  • 2
  • 21
  • 25
14

You can't. There is no documented way to close/shutdown/abort/destroy a generic Readable stream as of Node 5.3.0. This is a limitation of the Node stream architecture.

As other answers here have explained, there are undocumented hacks for specific implementations of Readable provided by Node, such as fs.ReadStream. These are not generic solutions for any Readable though.

If someone can prove me wrong here, please do. I would like to be able to do what I'm saying is impossible, and would be delighted to be corrected.

EDIT: Here was my workaround: implement .destroy() for my pipeline though a complex series of unpipe() calls. And after all that complexity, it doesn't work properly in all cases.

EDIT: Node v8.0.0 added a destroy() api for Readable streams.

thejoshwolfe
  • 5,328
  • 3
  • 29
  • 21
  • 1
    There’s now [`stream.pipeline`](https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback), which claims to handle “forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.” Does that help? – andrewdotn Jul 15 '18 at 15:51
14

At version 4.*.* pushing a null value into the stream will trigger a EOF signal.

From the nodejs docs

If a value other than null is passed, The push() method adds a chunk of data into the queue for subsequent stream processors to consume. If null is passed, it signals the end of the stream (EOF), after which no more data can be written.

This worked for me after trying numerous other options on this page.

Jacob Lowe
  • 698
  • 7
  • 16
  • 1
    Works for me. However, I needed to avoid calling the done() callback after pushing null to get the expected behavior - namely that the entire stream halts. – Rich Apodaca Jun 06 '16 at 20:45
7

This destroy module is meant to ensure a stream gets destroyed, handling different APIs and Node.js bugs. Right now is one of the best choice.

NB. From Node 10 you can use the .destroy method without further dependencies.

Rocco Musolino
  • 610
  • 10
  • 22
3

You can clear and close the stream with yourstream.resume(), which will dump everything on the stream and eventually close it.

From the official docs:

readable.resume():

Return: this

This method will cause the readable stream to resume emitting 'data' events.

This method will switch the stream into flowing mode. If you do not want to consume the data from a stream, but you do want to get to its 'end' event, you can call stream.resume() to open the flow of data.

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', () => {
  console.log('got to the end, but did not read anything');
});
Community
  • 1
  • 1
Sayem
  • 6,079
  • 4
  • 22
  • 26
  • This can be called "draining" the stream. In our case, of course we had a `'data'` event listener, but we made it check a boolean `if (!ignoring) { ... }` so it won't process data when we are draining the stream. `ignoring = true; readable.resume();` – joeytwiddle Sep 27 '16 at 02:21
  • 5
    Of course this assumes the stream will `'end'` at some point. Not all streams will do that! (E.g. a stream that sends the date every second, forever.) – joeytwiddle Sep 27 '16 at 02:26
3

It's an old question but I too was looking for the answer and found the best one for my implementation. Both end and close events get emitted so I think this is the cleanest solution.

This will do the trick in node 4.4.* (stable version at the time of writing):

var input = fs.createReadStream('lines.txt');

input.on('data', function(data) {
   if (gotFirstLine) {
      this.end(); // Simple isn't it?
      console.log("Closed.");
   }
});

For a very detailed explanation see: http://www.bennadel.com/blog/2692-you-have-to-explicitly-end-streams-after-pipes-break-in-node-js.htm

Vexter
  • 1,172
  • 11
  • 12
2

This code here will do the trick nicely:

function closeReadStream(stream) {
    if (!stream) return;
    if (stream.close) stream.close();
    else if (stream.destroy) stream.destroy();
}

writeStream.end() is the go-to way to close a writeStream...

g00dnatur3
  • 1,173
  • 9
  • 16
0

for stop callback execution after some call, you have to use process.kill with particular processID

const csv = require('csv-parser');
const fs = require('fs');

const filepath = "./demo.csv"
let readStream = fs.createReadStream(filepath, {
    autoClose: true,
});
let MAX_LINE = 0;


readStream.on('error', (e) => {
        console.log(e);
        console.log("error");
    })

    .pipe(csv())
    .on('data', (row) => {

        if (MAX_LINE == 2) {
            process.kill(process.pid, 'SIGTERM')
        }
        // console.log("not 2");
        MAX_LINE++
        console.log(row);
    })

    .on('end', () => {
        // handle end of CSV
        console.log("read done");
    }).on("close", function () {
        console.log("closed");
    })