39

I'm writing a large file with node.js using a writable stream:

var fs     = require('fs');
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' });

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write( lines[i] );
    }
}

I'm wondering if this scheme is safe without using drain event? If it is not (which I think is the case), what is the pattern for writing an arbitrary large data to a file?

Jordan Running
  • 102,619
  • 17
  • 182
  • 182
nab
  • 4,751
  • 4
  • 31
  • 42
  • To drain you can use Promises https://stackoverflow.com/questions/50357777/why-does-attempting-to-write-a-large-file-cause-js-heap-to-run-out-of-memory#answer-50360972 – Junior Usca Jun 03 '21 at 21:52

7 Answers7

29

That's how I finally did it. The idea behind is to create readable stream implementing ReadStream interface and then use pipe() method to pipe data to writable stream.

var fs = require('fs');
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var readStream = new MyReadStream();

readStream.pipe(writeStream);
writeStream.on('close', function () {
    console.log('All done!');
});

The example of MyReadStream class can be taken from mongoose QueryStream.

nab
  • 4,751
  • 4
  • 31
  • 42
13

The idea behind drain is that you would use it to test here:

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write(lines[i]); //<-- the place to test
    }
}

which you're not. So you would need to rearchitect to make it "reentrant".

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
        }
    }
}

However, does this mean that you need to keep buffering getLines as well while you wait?

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines,
    buffer = {
     remainingLines = []
    };
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
           buffer.remainingLines = lines.slice(i);
           break;
           //notice there's no way to re-run this once we leave here.
        }
    }
}

stream.on('drain',function(){
  if (buffer.remainingLines.length){
    for (var i = 0; i < buffer.remainingLines.length; i++) {
      var written = stream.write(buffer.remainingLines[i]); //<-- the place to test
      if (!written){
       //do something here to wait till you can safely write again
       //this means prepare a buffer and wait till you can come back to finish
       //  lines[i] -> remainder
       buffer.remainingLines = lines.slice(i);
      }
    }
  }
});
jcolebrand
  • 15,889
  • 12
  • 75
  • 121
  • 3
    It's unnecessary to use your own buffer. Node.js has done for you. Read source file nodejs-source/lib/fs.js#WriteStream.prototype.write – ayanamist Sep 17 '13 at 01:37
7

The cleanest way to handle this is to make your line generator a readable stream - let's call it lineReader. Then the following would automatically handle the buffers and draining nicely for you:

lineReader.pipe(fs.createWriteStream('someFile.txt'));

If you don't want to make a readable stream, you can listen to write's output for buffer-fullness and respond like this:

var i = 0, n = lines.length;
function write () {
  if (i === n) return;  // A callback could go here to know when it's done.
  while (stream.write(lines[i++]) && i < n);
  stream.once('drain', write);
}
write();  // Initial call.

A longer example of this situation can be found here.

Tyler
  • 28,498
  • 11
  • 90
  • 106
4

I found streams to be a poor performing way to deal with large files - this is because you cannot set an adequate input buffer size (at least I'm not aware of a good way to do it). This is what I do:

var fs = require('fs');

var i = fs.openSync('input.txt', 'r');
var o = fs.openSync('output.txt', 'w');

var buf = new Buffer(1024 * 1024), len, prev = '';

while(len = fs.readSync(i, buf, 0, buf.length)) {

    var a = (prev + buf.toString('ascii', 0, len)).split('\n');
    prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : '';

    var out = '';
    a.forEach(function(line) {

        if(!line)
            return;

        // do something with your line here

        out += line + '\n';
    });

    var bout = new Buffer(out, 'ascii');
    fs.writeSync(o, bout, 0, bout.length);
}

fs.closeSync(o);
fs.closeSync(i);
youurayy
  • 1,635
  • 1
  • 18
  • 11
3

Several suggested answers to this question have missed the point about streams altogether.

This module can help https://www.npmjs.org/package/JSONStream

However, lets suppose the situation as described and write the code ourselves. You are reading from a MongoDB as a stream, with ObjectMode = true by default.

This will lead to issues if you try to directly stream to file - something like "Invalid non-string/buffer chunk" error.

The solution to this type of problem is very simple.

Just put another Transform in between the readable and writeable to adapt the Object readable to a String writeable appropriately.

Sample Code Solution:

var fs = require('fs'),
    writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }),
    stream = require('stream'),
    stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
    this.push(JSON.stringify(data));
    this.push('\n');
    done();
}
rowFeedDao.getRowFeedsStream(merchantId, jobId)
.pipe(stringifier)
.pipe(writeStream).on('error', function (err) {
   // handle error condition
}
arcseldon
  • 35,523
  • 17
  • 121
  • 125
2

[Edit] The updated Node.js writable.write(...) API docs say:

[The] return value is strictly advisory. You MAY continue to write, even if it returns false. However, writes will be buffered in memory, so it is best not to do this excessively. Instead, wait for the drain event before writing more data.

[Original] From the stream.write(...) documentation (emphasis mine):

Returns true if the string has been flushed to the kernel buffer. Returns false to indicate that the kernel buffer is full, and the data will be sent out in the future.

I interpret this to mean that the "write" function returns true if the given string was immediately written to the underlying OS buffer or false if it was not yet written but will be written by the write function (e.g. was presumably buffered for you by the WriteStream) so that you do not have to call "write" again.

maerics
  • 151,642
  • 46
  • 269
  • 291
  • 1
    but "When writing a file descriptor in this manner, closing the descriptor before the stream drains risks sending an invalid (closed) FD." makes me think that the buffer being full means it can't accept any more code from you. I honestly don't know, and only gave it my best guess as an answer here. – jcolebrand Feb 28 '12 at 17:57
  • @jcolebrand: ya, I don't know either, but I'm guessing the "drain" event just signals that the OS is ready for writing immediately, in case you really want to avoid buffering of any sort, be it your own or from the WriteStream "write" method. However, the docs for "drain" mention "*safe to write again*", which is either a poor choice of wording or evidence against my interpretation! – maerics Feb 28 '12 at 17:59
1

If you do not happen to have an input stream you cannot easily use pipe. None of the above worked for me, the drain event doesn't fire. Solved as follows (based on Tylers answer):

var lines[]; // some very large array
var i = 0;

function write() {
    if (i < lines.length)  {
        wstream.write(lines[i]), function(err){
            if (err) {
                console.log(err);
            } else {
                i++;
                write();
            }
        });
    } else {
        wstream.end();
        console.log("done");
    }
};
write();
anneb
  • 5,510
  • 2
  • 26
  • 26