7

This is part of a larger process that I've distilled down to the minimal, reproducible example in node v14.4.0. In this code, it outputs nothing from inside the for loop.

I see only this output in the console:

before for() loop
finished
finally
done

The for await (const line1 of rl1) loop never goes into the for loop - it just skips right over it:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

But, if I remove either of the await once(stream, 'open') statements, then the for loop does exactly what it is expected to (lists all the lines of the rl1 file). So, apparently, there's some timing problem with the async iterator from the readline interface between that and the stream. Any ideas what could be going on. Any idea what could be causing this one or how to work around it?

FYI, the await once(stream, 'open') is there because of another bug in the async iterator where it does not reject if there's an issue opening the file whereas the await once(stream, 'open') causes you to properly get a rejection if the file can't be opened (essentially pre-flighting the open).

If you're wondering why the stream2 code is there, it is used in the larger project, but I've reduced this example down to the minimal, reproducible example and only this much of the code is needed to demonstrate the problem.


Edit: In trying a slightly different implementation, I found that if I combine the two once(stream, "open") calls in a Promise.all(), that it then works. So, this works:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const stream2 = fs.createReadStream(file2);
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
        // pre-flight file open to catch any open errors here
        // because of existing bug in async iterator with file open errors
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

This is obviously not supposed to be sensitive to exactly how you wait for file open There's some timing bug somewhere. I'd like to find that bug on either readline or readStream and file it. Any ideas?

jfriend00
  • 683,504
  • 96
  • 985
  • 979
  • I see an existing [May 18th issue](https://github.com/nodejs/node/issues/33463) filed on a similar problem. I have added my example to that issue. And, another [related issue](https://github.com/nodejs/node/issues/34219) from July 5th. – jfriend00 Jul 13 '20 at 23:37
  • And, here''s [the issue from last December](https://github.com/nodejs/node/issues/30831) that causes me to have to put in the `await once(stream, 'open')` in order to properly catch errors when opening the file. – jfriend00 Jul 13 '20 at 23:42
  • That [December bug](https://github.com/nodejs/node/issues/30831) arose out of this stackoverflow question: [How to handle error from fs readline interface async iterator](https://stackoverflow.com/questions/59216364/how-to-handle-error-from-fs-readline-interface-async-iterator). – jfriend00 Jul 13 '20 at 23:56
  • My first thought based on the difference between your two code snippets is that a `data` event is fired before you attach the read stream to the readline interface. Awaiting `open` before doing something with the stream likely has implications since event emitters are synchronous and promise resolution is not. – Jake Holzinger Jul 14 '20 at 00:00
  • I think you're right that someone is throwing away data somewhere in error. But as soon as either the stream or the readline interface has a `data` event listener installed, it should be listening for that data as it comes in and storing it for future iterations of the iterator. That's the only way such an async iterator can possibly work because data doesn't arrive in perfect line by line chunks so it has to be buffered when you get data beyond what is currently being asked for. And, that should work just fine (if properly coded) whether the file is already open or not. – jfriend00 Jul 14 '20 at 00:05
  • FYI, as a comment on the general readline module, the whole `readline` interface is very poorly implemented to work with file streams and the code is a massive set of `if` statements related to whether it's a tty or not. This is just a messy and flawed implementation. At worst, it should have been a class hierarchy with a tty implementation and a stream implementation. At best, streams would just have their own line-by-line mechanism that isn't polluted by all the tty stuff and all these iterator bugs since it's such a core feature of reading files. – jfriend00 Jul 14 '20 at 00:08
  • I think the issue here is that `line` events are not buffered, nor is it indicated in the `readline` module that they are, so you attach the stream to the readline interface and it begins emitting lines as you await the second stream's `open` event, by the time you start iterating the lines whatever was emitted before is lost. – Jake Holzinger Jul 14 '20 at 00:34
  • @JakeHolzinger - Then, the async iterator with the readline interface is just useless if you can't do any other asynchronous operations while using it. But, there are also other bugs that it just misses lines. I think it's just a flawed implementation. It should be buffering lines and pausing the stream when it has lines waiting. It should be both buffering and implementing flow control. Otherwise, the async iterator can never work reliably (which it apparently doesn't). – jfriend00 Jul 14 '20 at 00:37
  • It's not completely useless, but it assumes that you will begin listing to `line` events synchronously. If you simply created the readline interface immediately before iterating it you shouldn't have a problem. I'm not sure if that meets your needs, but it would work. – Jake Holzinger Jul 14 '20 at 00:39
  • 2
    @JakeHolzinger - Is it documented that you have to use it immediately without any intervening asynchronous operations? And, why would an ASYNCHRONOUS iterator have a requirement that you can't use it around other asynchronous operations? And, it's got other bugs. So, many I've encountered that I would rather reimplement line by line processing myself (or use an external module) rather than use it. When you insert one line of code and it breaks for some undocumented timing reasons, that's just a broken design. It should be fixed to be reliable or removed. – jfriend00 Jul 14 '20 at 00:42
  • I agree, and I personally would not use it either. A simple transform stream implementation could do a much better job, and I think it would also support async iterators out of the box, though I have to admit I haven't used that stream feature myself. – Jake Holzinger Jul 14 '20 at 00:48
  • No it is not documented, but without looking at the code and with the knowledge that the readline module existed before it had async iterator support I think it's safe to assume the async iterator is backed by a line event listener. You can reproduce your issue without async iterators. Just replace your loop with a line event listener followed by `await once(stream1, 'end');` – Jake Holzinger Jul 14 '20 at 00:55

3 Answers3

5

It turns out the underlying issue is that readline.createInterface() immediately, upon calling it will add a data event listener (code reference here) and resume the stream to start the stream flowing.

input.on('data', ondata);

and

input.resume();

Then, in the ondata listener, it parses the data for lines and when it finds a line, it fires a line events here.

for (let n = 0; n < lines.length; n++)
  this._onLine(lines[n]);

But, in my examples, there were other asynchronous things happening between the time that readline.createInterface() was called and the async iterator was created (that would listen for the line events). So, line events were being emitted and nothing was yet listening for them.

So, to work properly readline.createInterface() REQUIRES that whatever is going to listen for the line events MUST be added synchronously after calling readline.createInterface() or there is a race condition and line events may get lost.


In my original code example, a reliable way to work-around it is to not call readline.createInterface() until after I've done the await once(...). Then, the asynchronous iterator will be created synchronously right after readline.createInterface() is called.

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const stream2 = fs.createReadStream(file2);
        // wait for both files to be open to catch any "open" errors here
        // since readline has bugs about not properly reporting file open errors
        // this await must be done before either call to readline.createInterface()
        // to avoid race conditions that can lead to lost lines of data
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

One way to fix this general issue would be to change readline.createInterface() so that it does not add the data event and resume the stream UNTIL somebody adds a line event listener. This would prevent data loss. It would allow the readline interface object to sit there quietly without losing data until the receiver of its output was actually ready. This would work for the async iterator and it would also prevent other uses of the interface that had other asynchronous code mixed in from possibly losing line events.

Note about this added to a related open readline bug issue here.

jfriend00
  • 683,504
  • 96
  • 985
  • 979
2

You can make this work as expected if you create the async iterator immediately after constructing the readline interface. If you wait to create the async iterator you may lose some lines as the line events are not buffered by the readline interface, but by virtue of the async iterator they will be buffered.

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const rl1Iterator = rl1[Symbol.asyncIterator]();

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1Iterator) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("stream.txt", "stream.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

Based on discussion in the comments this still may not be a desirable solution as the readline module has various other issues, but I figured I would add an answer to resolve the problem as indicated in the original question.

Jake Holzinger
  • 5,783
  • 2
  • 19
  • 33
  • This does appear to work. If this is the main issue, then why does my second code example using `Promise.all()` work at all? It has the same issue you speak of avoiding here. I just don't understand why my other code example that still has the same issue you say to avoid here works? It seems like this is not the entire explanation for the problem or the `Promise.all()` example would not work either. – jfriend00 Jul 14 '20 at 01:29
  • I cannot say for sure, but its a race condition (or timing issue as you called it). Perhaps the additional CPU cycles needed to create the read stream and readline interface make that much of a difference. To be fair if I put `await new Promise((resolve) => setTimeout(resolve));` before the loop the same behavior of the first example is present. – Jake Holzinger Jul 14 '20 at 01:39
  • There may also be optimizations in promise resolution such that the `Promise.all()` gets resolved in one microtask (assuming both files open before promise resolution is done), whereas the first example requires two microtasks which would indicate that it would preform slower, thus exacerbating the race condition. – Jake Holzinger Jul 14 '20 at 01:48
  • In looking at the `readline.createInterface()`, it does immediately install a `data` listener on the stream and it resumes the stream. So, data will start flowing. I've got a mal-functioning debugger so I've not been able to see exactly what it does with those data events when they arrive, but it is likely firing `line` events and nobody is listening for them. Yep, I've confirmed. It's is a race condition on whether any `line` events fire before the async iterator is created. I don't know why that does not happen in the `Promise.all()` example. – jfriend00 Jul 14 '20 at 01:48
  • 1
    A more fundamental way to fix this might be to have readline to not add it's `data` listener and not resume the stream until someone installs a `line` listener. This would appear to be a generic problem for the readline interface as you have to synchronously install your `line` listener or you miss data. It doesn't appear that it has to be designed that way. – jfriend00 Jul 14 '20 at 01:50
  • Yes, I agree, though it may be this way for historic/backwards compatible reasons. – Jake Holzinger Jul 14 '20 at 01:51
  • I'd be hard pressed to come up with a reason why code paths that lose data are preserved for historical reasons. More likely, it was just written this way before promises and before people did as much async development as they do now and thus the problem was rarely encountered. There are multiple bugs filed on readline for "losing data". – jfriend00 Jul 14 '20 at 01:53
  • I'm not a node maintainer so I couldn't tell you, but I think the readline module predates the stream API overhaul, otherwise it would probably just be an object mode stream rather than a stream with a special `line` event. No need to speculate though, I agree that your suggestion would be an improvement. – Jake Holzinger Jul 14 '20 at 01:59
  • Thanks for your help in figuring this out Jake. `readline.createInterface()` carries a number of caveats. And, even more if using it in combination with the async iterator. Besides this issue, it also doesn't properly handle errors in opening or reading from the input stream (separate bug filed on that - which some people have been working on, but has not made it into a release yet). – jfriend00 Jul 14 '20 at 02:17
2

The readline module could also be replaced with a simple Transform stream using the more modern stream API. The modern stream API supports async iterators out of the box as well as backpressure (e.g. the write side of the stream (file reading) will pause until the read side of the stream (line reading) is being consumed).

const fs = require('fs');
const { Transform } = require('stream');

function toLines() {
    let remaining = '';
    return new Transform({
        writableObjectMode: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            try {
                const lines = (remaining + chunk).split(/\r?\n/g);
                remaining = lines.pop();
                for (const line of lines) {
                    this.push(line);
                }
                callback();
            } catch (err) {
                callback(err);
            }
        },
        flush(callback) {
            if (remaining !== '') {
                this.push(remaining);
            }
            callback();
        }
    });
}


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
        const rl1 = stream1.pipe(toLines());

        const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
        const rl2 = stream2.pipe(toLines());

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

This example doesn't support the crlfDelay option of the readline module, but the algorithm could be modified to do something similar. It also (as far as I can tell) has better error handling than is supported by the readline module.

Jake Holzinger
  • 5,783
  • 2
  • 19
  • 33
  • Wouldn't this have an issue if you get a chunk that doesn't have a line boundary in it or the end of a chunk that isn't actually a full line? – jfriend00 Jul 14 '20 at 03:46
  • If a `chunk` does not have a line boundary it gets appended to the previous `remaining` (e.g. `remaining=remaining+chunk`). If `flush` is called, then whatever is in `remaining` is considered a line. So yes, it is possible that the `remaining` data is not technically a line since it may not have a proper line ending. You could imagine a large text file with no line endings, does the text in the file exist on a line at that point? What about a file that does not end in a line ending? Is the last bit of text a line? – Jake Holzinger Jul 14 '20 at 04:18