3

I need to read a logfile with thousands of lines and write each line to a Mongo database. I am reading the file using a node stream. I am splitting the file into 'lines' using the 'split' npm package. The MongoDB write will take a lot longer than the logfile read due to network considerations.

My core code looks like this:

var readableStream = fs.createReadStream(filename);

            readableStream
                .pipe(split()) // This splits the data into 'lines'
                .on('data', function (chunk) {

                    chunkCount++;
                    slowAsyncFunctionToWriteLogEntryToDatabase(chunk); // This will take ages

                })
                .on('end', function () {
                    // resolve the promise which bounds this process
                    defer.resolve({v:3,chunkCount: chunkCount})

                });

Do I need to worry that the MongoDB system will be hammered by the number of writes being queued? Presumably the node pipe back-pressure mechanism won't know that lots of db writes are being queued? Is there any way to 'slow' the readable stream so that it waits for each MongoDB insert to finish before it reads the next line from the logfile? Am I worrying unnecessarily?

Journeyman
  • 10,011
  • 16
  • 81
  • 129
  • I guess you could read the entire log file and then insert all the document in a single db call with db.collection.insertMany(). It would be way faster – felix Nov 24 '16 at 12:12

2 Answers2

2

Since working with pause() and resume() seems to have some problems. I will write another option, which is using Transform stream.

var Transform = require('stream').Transform;

var myTransform = new Transform({
   transform(chunk, encoding, cb) {
      chunkCount++;

      syncFunctionToWriteLogEntryWithCallback( chunk, function() {
         cb();
      } );
  },

  flush(cb) {
      chunkCount++;
      syncFunctionToWriteLogEntryWithCallback( chunk, function() {
         cb();
      } );
  }
});

readableStream
        .pipe( split() )
        .pipe( myTransform );

Using transform streams allow you to supply a callback, whenever you have finished processing the stream.

drinchev
  • 19,201
  • 4
  • 67
  • 93
  • 1
    Great, that's now behaving exactly as I wanted it to - it allows each line to be fully processed before the stream moves on to the next line. Perfect solution, although the performance is now terrible, presumably because of the way I'm pushing the requests to Mongo! Thanks very much for your help. – Journeyman Nov 24 '16 at 19:08
  • Performance improvement might be use of another mongoDB insert function. There is [`bulk`](https://docs.mongodb.com/v3.2/reference/method/Bulk.insert/) – drinchev Nov 24 '16 at 19:12
  • I changed the insert to insert arrays of docs instead of individual docs, and this has dramatically improved performance. All set to go! Thanks very much for your help again :) – Journeyman Nov 25 '16 at 15:42
  • @drinchev - ran into similar issue and tried bulk but the way I did it is to call insert on `data` and `execute` on `end` which effectively makes streaming useless and code died in flames of heap memory :) This answer https://stackoverflow.com/a/33360069/856498 however worked amazingly well fo pulling in a huge gzipped csv and inserting rows after transforms. – cyberwombat Dec 13 '17 at 23:42
0

You can use pause method in the readable stream to stop the stream while you write your chunk to the mongodb.

readableStream
            .pipe(split()) // This splits the data into 'lines'
            .on('data', function (chunk) {

                readableStream.pause()

                chunkCount++;

                syncFunctionToWriteLogEntryWithCallback( chunk, function() {
                    readableStream.resume();
                } );

            })
            .on('end', function () {
                // resolve the promise which bounds this process
                defer.resolve({v:3,chunkCount: chunkCount})

            });

I don't think there will be a significant problem with MongoDB in this case.

drinchev
  • 19,201
  • 4
  • 67
  • 93
  • 1
    thanks. I did look at .pause() but the docs state that .pause() doesn't stop the stream immediately, but that several further chunks might be delivered before the pause occurs. If resume has already been called at that point then it seemed to me that the desired effect might be wholly negated by this aspect of .pause(). But I will experiment with it and see how it behaves. – Journeyman Nov 24 '16 at 15:04
  • So, I added pause() and resume(). For a logfile with 38508 lines, by the time we get to the 'end' of the stream then 37913 Mongo writes are still 'queued' awaiting processing, which suggests that the pause()/resume() mechanism isn't really helping to throttle the loop. I'd really like a mechanism that keeps the read/write in step. – Journeyman Nov 24 '16 at 15:58
  • Hmm. Okay! Maybe making it with `Transform stream` will be more efficient. I will try to put an example – drinchev Nov 24 '16 at 17:29