1

I am trying to read a file line by line using nodejs readline, for each line I want to perform some functions asynchronously, and then continue till the end of file.

const readline = require("readline");
const fs = require("fs");

let rl = readline.createInterface({
    input: fs.createReadStream('b'),
    crlfDelay: Infinity
});

rl.on('line', async (line) => {
    console.log('start line');
    await both();
    console.log('end line');
});

rl.on('close', () => {
    console.log('read complete');
});

function one() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 2000);
    });
}

function two() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 3000);
    });
}
async function both() {
    ap = [];
    ap.push(one());
    ap.push(two());
    console.log('processing line');
    await Promise.all(ap);
    console.log('line processed');
}

The file b can be any file with some lines, say,

1
2
3
4
5
6
7

The output that I am expecting something like:

start line
line processing
line processed
end line
.
.
.

However, I am unable to maintain order.

To my understanding, it seems the 'line' event is getting emitted which is calling the callback again and again!.

Is there any way we can make this event to wait until the event in hand is processed asynchronously(various steps running asynchronously) and after that repeat.

**Important Upadate ** So the file for the use case is going to contain around >5GB of CSV Text. And we have a memory constraint of <3GB and max time 15 minutes (AWS Lambda).

  • line event wont wait for asynchronous code in any previous line event - you'll need to create some sort of queue - I wonder if an async generator would be the way to go – Jaromanda X Apr 23 '20 at 10:35

1 Answers1

0

To my understanding, it seems the 'line' event is getting emitted which is calling the callback again and again!.

Yes, I think that is the issue as well.

The problem is similar to Producer Consumer problem.

What you can do is create a list of events and add the line event to the list of events whenever it gets called. The only difference is that the Producer(events that are being created) can never fill up the buffer. But the consumer(the function both) needs to be reminded to consume the remaining events. If there are no events, the consumer goes to sleep. Every time there is a new event, the producer checks if the consumer is awake, if it is not, it wakes up the consumer.

Your solution should be -

const readline = require("readline");
const fs = require("fs");

let rl = readline.createInterface({
    input: fs.createReadStream('b'),
    crlfDelay: Infinity
});

const lineEventsToProcess = [];
let bothRunning = false;
rl.on('line', (line) => {
    // Add the line event to the list of line events
    lineEventsToProcess.push(line)
    // Both is not running i.e. the consumer is asleep
    if (!bothRunning) {
        both()
    }
});

rl.on('close', () => {
    console.log('read complete');
});

function one() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 2000);
    });
}

function two() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 3000);
    });
}
async function both() {
    // Set bothRunning to true
    bothRunning = true;

    while(lineEventsToProcess.length > 0) {
        console.log('start line');
        ap = [];
        ap.push(one());
        ap.push(two());
        console.log('processing line');
        await Promise.all(ap);
        console.log('line processed');

        // Remove the first element
        lineEventsToProcess.splice(0, 1)
        console.log('end line');
    }

    // Both is not running anymore
    bothRunning = false;
}

Modified it a bit by replacing the line events with setInterval so that I can test it. Here it is if you want to test it in the browser or someone has a similar problem -

const lineEventsToProcess = [];
let bothRunning = false;
setInterval(() => {
    // Add the line event to the list of line events
    lineEventsToProcess.push(1)
    // Both is not running i.e. the consumer is asleep
    if (!bothRunning) {
        both();
    }
}, 100);

function one() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 2000);
    });
}

function two() {
    return new Promise((resolve, reject) => {
        setTimeout(() => resolve('two'), 3000);
    });
}
async function both() {
    // Set bothRunning to true
    bothRunning = true;

    while(lineEventsToProcess.length > 0) {
        console.log('start line');
        ap = [];
        ap.push(one());
        ap.push(two());
        console.log('processing line');
        await Promise.all(ap);
        console.log('line processed');

        // Remove the first element
        lineEventsToProcess.splice(0, 1)
        console.log('end line');
    }

    // Both is not running anymore
    bothRunning = false;
}

Comment below if you need more explanation on this.

I am not saying this is the best solution but it should work. If you want to improve this, I would recommend to modularise the code by creating classes for Producer and Consumers. There are tons of solutions for Porducer-consumer problem online.

Sanil Khurana
  • 1,129
  • 9
  • 20
  • the callback to readline's line event is async, does it still needs to be async? – Pulkit Sharma Apr 23 '20 at 11:47
  • Nope. No reason for it to be async since it doesn't have an await in it. Updated my answer. Does the answer work for you? – Sanil Khurana Apr 23 '20 at 12:02
  • yes, it kind of worked. Thank you! :) However, I proceeded with the use of Generator Function to get a line and process it. Also, thanks to [this](https://stackoverflow.com/questions/43638105/how-to-get-synchronous-readline-or-simulate-it-using-async-in-nodejs) – Pulkit Sharma Apr 23 '20 at 12:33