14

Consider the code below ... I am trying to pause the stream after reading the first 5 lines:

var fs          = require('fs');
var readline    = require('readline');
var stream      = require('stream');
var numlines    = 0;
var instream    = fs.createReadStream("myfile.json");
var outstream   = new stream;
var readStream = readline.createInterface(instream, outstream);
readStream.on('line', function(line){
  numlines++;
  console.log("Read " + numlines + " lines");
  if (numlines >= 5) {
    console.log("Pausing stream");
    readStream.pause();
  }
});

The output (copied next) suggests that it keeps reading lines after the pause. Perhaps readline has queued up a few more lines in the buffer, and is feeding them to me anyway ... this would make sense if it continues to read asynchronously in the background, but based on the documentation, I don't know what the proper behavior should be. Any recommendations on how to achieve the desired effect?

Read 1 lines
Read 2 lines
Read 3 lines
Read 4 lines
Read 5 lines
Pausing stream
Read 6 lines
Pausing stream
Read 7 lines

4 Answers4

23

Somewhat unintuitively, the pause methods does not stop queued up line events:

Calling rl.pause() does not immediately pause other events (including 'line') from being emitted by the readline.Interface instance.

There is however a 3rd-party module named line-by-line where pause does pause the line events until it is resumed.

var LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

lr.on('error', function (err) {
  // 'err' contains error object
});

lr.on('line', function (line) {
  // pause emitting of lines...
  lr.pause();

  // ...do your asynchronous line processing..
  setTimeout(function () {

      // ...and continue emitting lines.
      lr.resume();
  }, 100);
});

lr.on('end', function () {
  // All lines are read, file is closed now.
});

(I have no affiliation with the module, just found it useful for dealing with this issue.)

Alexander O'Mara
  • 58,688
  • 18
  • 163
  • 171
  • Thanks for this answer. Out of interest, how common is such a requirement? I'm parsing an 80GB CSV that needs to be streamed to a server. what other use-cases are there? – Zach Smith Jul 18 '17 at 19:31
  • 3
    @ZachSmith I found being able to pause and resume at will very useful for when the callback cannot or should not be completed synchronously (say, inserting lines into a database). If you are reading lines faster than you can process them, you could queue up too many requests and run out of memory. – Alexander O'Mara Jul 19 '17 at 06:50
  • My use case is reading a large-ish (<100MB) file in which ordering is important (header, data, data, data, header, data, data, ...) into a MySQL stream that queries the database every few thousand lines. – Michał Tatarynowicz Jan 13 '21 at 10:18
11

So, it turns out that the readline stream tends to "drip" (i.e., leak a few extra lines) even after a pause(). The documentation does not make this clear, but it's true.

If you want the pause() toggle to appear immediate, you'll have to create your own line buffer and accumulate the leftover lines yourself.

4

add some points:

.on('pause', function() {
    console.log(numlines)
})

You will get the 5. It mentioned in the node.js document :

  • The input stream is not paused and receives the SIGCONT event. (See events SIGTSTP and SIGCONT)

So, I created a tmp buffer in the line event. Use a flag to determine whether it is triggered paused.

.on('line', function(line) {
   if (paused) {
      putLineInBulkTmp(line);
   } else {
      putLineInBulk(line);
   }
}

then in the on pause, and resume:

.on('pause', function() {
    paused = true;
    doSomething(bulk, function(resp) {
        // clean up bulk for the next.
        bulk = [];
        // clone tmp buffer.
        bulk = clone(bulktmp);
        bulktmp = [];
        lr.resume();
    });
})
.on('resume', () => {
  paused = false;
})

Use this way to handle this kind of situation.

Yang Young
  • 602
  • 5
  • 6
  • 1
    I think there is a possibility to lose some data in between `bulk = clone(bulktmp);` and `lr.resume();` – I.K. Sep 25 '19 at 09:33
0

You can adjust the amount of internal buffering readline performs by through highwaterMark. See https://nodejs.org/api/stream.html#buffering

Ken Lin
  • 1,819
  • 21
  • 21