7

I am reading a CSV file line by line and inserting/updating in MongoDB. The expected output will be 1. console.log(row); 2. console.log(cursor); 3.console.log("stream");

But getting output like 1. console.log(row); console.log(row); console.log(row); console.log(row); console.log(row); ............ ............ 2. console.log(cursor); 3.console.log("stream"); Please let me know what i am missing here.

const csv = require('csv-parser');
const fs = require('fs');

var mongodb = require("mongodb");

var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {

  var db = client.db("UKCompanies");
  collection = db.collection("company");
  startRead();
});
var cursor={};

async function insertRec(row){
  console.log(row);
  cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
  if(cursor){
    console.log(cursor);
  }else{
    console.log('not exist')
  }
  console.log("stream");
}



async function startRead() {
  fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      await insertRec(row);
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}
Pritam Parua
  • 672
  • 2
  • 8
  • 27

3 Answers3

22

In your startRead() function, the await insertRec() does not stop more data events from flowing while the insertRec() is processing. So, if you don't want the next data event to run until the insertRec() is done, you need to pause, then resume the stream.

async function startRead() {
  const stream = fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      try {
        stream.pause();
        await insertRec(row);
      } finally {
        stream.resume();
      }
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}

FYI, you also need some error handling if insertRec() fails.

jfriend00
  • 683,504
  • 96
  • 985
  • 979
2

That is expected behavior in this case because your on data listener triggers the insertRec asynchronously as and when data is available in stream. So that is why your first line of insert method is getting executed kind of in parallel. If you want to control this behavior you can use highWaterMark (https://nodejs.org/api/stream.html#stream_readable_readablehighwatermark) property while creating the read stream. This way you will get 1 record at a time but I am not sure what your use case is.

something like this

fs.createReadStream(`somefile.csv`, {
  "highWaterMark": 1
})

Also you are not awaiting your startRead method. I would wrap it inside the promise and resolve it in end listener else you will not know when the processing got finished. Something like

function startRead() {
  return new Promise((resolve, reject) => {
    fs.createReadStream(`somepath`)
      .pipe(csv())
      .on("data", async row => {
        await insertRec(row);
      })
      .on("error", err => {
        reject(err);
      })
      .on("end", () => {
        console.log("CSV file successfully processed");
        resolve();
      });
  });

}
Ashish Modi
  • 7,529
  • 2
  • 20
  • 35
  • 1
    Setting the highWaterMark does not let you throttle the rate of `data` events. The OP should instead implement a stream Writable that can be configured to `write` document-by-document, or `writev` a bulk of documents. The `highWaterMark` lets you control memory pressure. – jorgenkg Feb 01 '20 at 15:47
  • @jorgenkg that is true. Thanks for clarifying. – Ashish Modi Feb 01 '20 at 15:49
  • @jorgenkg - "For streams operating in object mode, the highWaterMark specifies a total number of objects" - https://nodejs.org/api/stream.html#stream_buffering – Ashish Modi Feb 01 '20 at 16:02
  • Yes - the number of objects that will be buffered in the (read/write) stream's internal buffer. Objects will always be processed one at a time with `write`.The highWaterMark indicate how many objects that can be buffered for the stream instance. – jorgenkg Feb 03 '20 at 12:23
  • Also in this example, the last result is not awaited as the "end" event will fire during its execution of "data" event handler. If the process were to now exit for example there would be data loss in that last record. – simbolo Oct 04 '22 at 10:37
2

From Node 10+ ReadableStream got property Symbol.asyncIterator and is's allow processing stream using for-await-of

async function startRead() {
    const readStream = fs.createReadStream('./data/inside/6.csv');    
    
    for await (const row of readStream.pipe(csv())) {
        await insertRec(row);
    }

    console.log('CSV file successfully processed');
}
Alex K
  • 2,613
  • 1
  • 20
  • 31