2

I am trying to figure out a way to open a Node.js Stream to read a text/CSV file as a Stream, so that:

  • Read a batch of lines
  • Pause the reading
  • Do some async processing: Parse all lines to an array of objects, then send them to an API.
  • Resume reading
  • Go to step 1

I have been using readline with some success, but it doesn't quite work because pause() doesn't stop it right away, and it keeps reading more lines while the previous batch is still processing. The end result is the downstream API call chokes on concurrent batches, and I get throttled.

I would like to "close the faucet" while a batch is processed, so I can backoff if I get throttled, backoff, then retry until the batch is fully processed, THEN I can resume reading.

There is an old package event-stream based on split that handle the line splitting, but both projects are archived.

Is this something that can be solved with a Transform Stream

2 Answers2

1

The previous solution actually lead to some duplicated processing. The quickest way I found to solve this would be the following, still using csv-parse:

import parse from 'csv-parse';
import fs from 'fs';

const stream = fs.createReadStream('FILE_PATH').pipe(parse());

let recordsInBatch = [];
let recordsCounter = 0;
const BATCH_SIZE = 1000;

for await (const record of stream) {
  recordsInBatch.push(record);
  recordsCounter += 1;
  if (recordsCounter >= BATCH_SIZE) {
    await yourAsyncCall(recordsInBatch);
    recordsCounter = 0;
    recordsInBatch = [];
  }
}

await yourAsyncCall(recordsInBatch);
0

example with csv

npm i csv-parser --save

const csv = require('csv-parser')
const fs = require('fs')

let counter = 0;
let batch = []

const stream = fs.createReadStream('FILE_PATH')
.pipe(csv())
.on('data', (data) => {
    console.log('data', data)

    batch.push(data)
    counter ++

    if(counter > 5000) {
        stream.pause()

        setTimeout(() => {
            // YOUR ASYNC PROCESSING
            counter = 0;
            batch = []
            stream.resume()
        }, 5000)
    }
})
.on('error', (e) => {
    console.error(e)
})
.on('end', () => {
    console.log('end');

   // YOUR ASYNC PROCESSING
});

example with txt

npm i line-by-line --save

const LineByLineReader = require('line-by-line')

let counter = 0;
let batch = []

rl = new LineByLineReader('FILE_PATH');

rl.on('line', (line) => {
    console.log('line', line)

    batch.push(line)
    counter ++

    if(counter > 5000) {
        rl.pause()

        setTimeout(() => {
            // YOUR ASYNC PROCESSING
            counter = 0;
            batch = []
            rl.resume()
        }, 5000)
    }
})
.on('error', (e) => {
    console.error(e)
})
.on('end', () => {
    console.log()

    // YOUR ASYNC PROCESSING
})
Alan Millirud
  • 1,049
  • 7
  • 14