12

I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.

This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).

So, I start with:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Now, I have two inter-related issues:

  1. I need to throttle the actual amount of data being processed, so as to not create memory pressures.
  2. The function passed as the processor param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now: pg-promise). As such, it will create a promise in memory and move on, repeatedly.

The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.

Does anyone have a working example of this that I can use as a jumping point?

UPDATE: Probably more than one way to skin the cat, but this works:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Anyone sees a potential problem with this approach?

vitaly-t
  • 24,279
  • 15
  • 116
  • 138
alphadogg
  • 12,762
  • 9
  • 54
  • 88
  • Looks neat, and if this works, then great job! I'm glad that the most recent addition of `page` into `pg-promise` wasn't in vain ;) – vitaly-t Oct 16 '15 at 03:12
  • Just simplified it in the end of readDataFromStream ;) You do not need to `return undefined`, that's what happens when you return nothing anyway ;) – vitaly-t Oct 16 '15 at 03:19
  • Actually, there may be a problem with this... when you call db.task, yo do not handle the result from it, so in case it rejects, there will be an error thrown by the promise library that your reject wasn't handled. – vitaly-t Oct 17 '15 at 23:06
  • Should I do a `return this.page()` with a `catch()` on `task()`? – alphadogg Oct 19 '15 at 19:36
  • I have updated my answer - it gives your the whole picture of how to solve your problem. – vitaly-t Oct 19 '15 at 20:48

4 Answers4

8

You might want to look at promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });

Works with backpressure and everything.

Gjorgi Kjosev
  • 1,559
  • 14
  • 24
5

Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

Note that the only thing I changed: using library csv-parse instead of csv, as a better alternative.

Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.

vitaly-t
  • 24,279
  • 15
  • 116
  • 138
  • Wouldn't this try to read the next item from the `parser` after `query("INSERT…")` is done, irregardless of whether the next item is readable already? Or does `parser.read()` return a promise? – Bergi Oct 15 '15 at 04:21
  • Also, what happened to the promise-returning `processor` callback function that OP was looking for? – Bergi Oct 15 '15 at 04:21
  • @Bergi My understanding was that parser.read() is synchronous, the way it was shown. And if it turns out to be not, then it will need to be wrapped into a promise, obviously. And `readable` is fired once, not for each read operation, that's my understanding. As for the promise-returning processor, he was simply looking for a resolve when the data processing has finished, and a reject in case it failed, which my example provides, i.e. the task will resolve/reject accordingly. – vitaly-t Oct 15 '15 at 04:27
  • Hm, I'd need to read the [stream docs](https://nodejs.org/api/stream.html#stream_class_stream_readable) again, but I don't think that's how it works – Bergi Oct 15 '15 at 04:35
  • Yeah, I'm not quite sure about the streams part myself, and I wrote the example based on the code provided with the question. If that code is wrong, then so would be mine. But still, it does show the general approach. – vitaly-t Oct 15 '15 at 04:38
  • The answer has been rewritten completely, based on a fully tested piece of code. – vitaly-t Oct 19 '15 at 21:52
  • @vitaly-t: I looked through your addition to `spex` for `read()` and it's quite beautiful and tidy. ;) My code above was just my experimentation; hadn't refactored it and cleaned it up yet. Thank you. BTW, `csv-parse` is part of the whole `csv` module, or is the smiley face acknowledgement of that? – alphadogg Oct 20 '15 at 12:25
  • @vitaly-t: Oh, and, in an earlier comment, you state that the `readable` event is only fired once. But, in fact, it can be fired multiple times. Does this affect your code? – alphadogg Oct 20 '15 at 12:52
  • @alphadogg, about `readable` fired once - that's now not relevant, please ignore that, it was for the old code sample I provided. Choice of CSV isn't really important here. – vitaly-t Oct 20 '15 at 17:12
  • I get a weird error where the amount of lines read is actually much higher than the amount of operations done: DATA: { calls: 36005, reads: 4129369, length: 0, duration: 169852 } is that intended? – Qiong Wu Jul 16 '16 at 01:13
  • @QiongWu this usually means that your code is taking long time to process the data received. – vitaly-t Jul 16 '16 at 02:00
  • but how can I ensure that every read results in a call in the end, even if the code takes long time to process the data? right now it seems that a good portion of the lines read are getting lost – Qiong Wu Jul 16 '16 at 14:36
  • @QiongWu nothing should be lost, you will get your data in chunks. – vitaly-t Jul 16 '16 at 21:59
2

I found a slightly better way of doing the same thing; with more control. This is a minimal skeleton with precise parallelism control. With parallel value as one all records are processed in sequence without having the entire file in memory, we can increase parallel value for faster processing.

      const csv = require('csv');
      const csvParser = require('csv-parser')
      const fs = require('fs');

      const readStream = fs.createReadStream('IN');
      const writeStream = fs.createWriteStream('OUT');

      const transform = csv.transform({ parallel: 1 }, (record, done) => {
                                           asyncTask(...) // return Promise
                                           .then(result => {
                                             // ... do something when success
                                             return done(null, record);
                                           }, (err) => {
                                             // ... do something when error
                                             return done(null, record);
                                           })
                                       }
                                     );

      readStream
      .pipe(csvParser())
      .pipe(transform)
      .pipe(csv.stringify())
      .pipe(writeStream);

This allows doing an async task for each record.

To return a promise instead we can return with an empty promise, and complete it when stream finishes.

    .on('end',function() {
      //do something wiht csvData
      console.log(csvData);
    });
Gagandeep Kalra
  • 1,034
  • 14
  • 13
1

So to say you don't want streaming but some kind of data chunks? ;-)

Do you know https://github.com/substack/stream-handbook?

I think the simplest approach without changing your architecture would be some kind of promise pool. e.g. https://github.com/timdp/es6-promise-pool

Markus
  • 512
  • 1
  • 4
  • 21
  • Well, I have thought of using `async.queue` in the function, returning a promise of eventually finishing the file (or not). However, I was wondering how one ties a promise library like Bluebird with the typical stream-based processing of large files. ('pg-promise` includes `spex`, which provides for higher-level promise functions) – alphadogg Oct 14 '15 at 18:45