1

Creating a very simple Node.js utility to process each record separately in a text file (line by line), but it is surprisingly difficult to handle the following scenario due to the inherent async world of Node:

  1. Open connection to database
  2. Read each line of a text file
  3. Based on conditions within the processed text of the line, look up a record in the database
  4. Upon completion of reading the text file, close the database connection

The challenge I face is that the text file is read in line-by-line (using the 'readline' module), attaching a listener to the 'line' event emitted by the module. The lines of the file are all processed rapidly and the queries to the database are queued up. I have tried many approaches to essentially create a synchronous process to no avail. Here is my latest attempt that is definitely full of async/await functions. Being a longtime developer but new to Node.js I know I am missing something simple. Any guidance will be greatly appreciated.

const { Pool, Client } = require('pg')

const client = new Client({
  user: '*****',
  host: '****',
  database: '*****',
  password: '******#',
  port: 5432,
})


client.connect()
  .then(() => {

    console.log("Connected");

    console.log("Processing file");

    const fs = require('fs');
    const readline = require('readline');
    const instream = fs.createReadStream("input.txt");
    const outstream = new (require('stream'))();
    const rl = readline.createInterface(instream, outstream);

    rl.on('line', async function (line) {

        var callResult;

        if (line.length > 0) {

            var words = line.replace(/[^0-9a-z ]/gi, '').split(" ");
            var len = words.length;

            for (var i = 0; i < words.length; i++) {
                if (words[i].length === 0) {         
                  words.splice(i, 1);
                  i--;
                } else {
                    words[i] = words[i].toLowerCase();  
                }
              }

            for (var i = 0; i < words.length; i++) {

                if (i <= words.length - 3) {

                    callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim() + " " + words[i + 2].trim());

                    if (!callResult) {

                        callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());

                        if (!callResult) {

                            callResult = await isKeyPhrase(words[i].trim());
                        } 
                    };

                } else if (i <= words.length - 2) {

                    callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());

                    if (!callResult ) {

                        callResult = await isKeyPhrase(words[i].trim());

                    };

                } else if (i < words.length) {

                    callResult = await isKeyPhrase(words[i].trim());
                }
            } 

        }       // (line.length > 0)        

    });

    rl.on('close', function (line) {
        console.log('done reading file.');

        // stubbed out because queries are still running
        //client.end();

    });


  }).catch( (err) => {
    console.error('connection error', err.stack);
});

async function isKeyPhrase(keyPhraseText) {

    var callResult = false;

    return new Promise(async function(resolve, reject) {

        const query = {
          name: 'get-name',
          text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
          values: [keyPhraseText],
          rowMode: 'array'
        }

        // promise
        await client.query(query)
          .then(result => {

            if (result.rowCount == 1) {

                console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);

                calResult = true;

            } 

          }).catch(e => {

            console.error(e.stack)
            console.log(e.stack);
            reject(e);

        });

        resolve(callResult);

    });

}
tequacreek
  • 23
  • 5

1 Answers1

0

welcome to StackOverflow. :)

Indeed there's no (sensible) way to read a file synchronously while trying to interact the data per-line with a database. There's no feasible way if the file is bigger than probably 1/8th of your memory.

This doesn't mean however there's no way or writing a sane code for this. The only problem is that standard node streams (including readline) do not wait for async code.

I'd recommend using scramjet, a functional stream programming framework, pretty much designed for you use case (disclamer: I'm the author). Here's how the code would look like:

const { Pool, Client } = require('pg')
const { StringStream } = require("scramjet");

const client = new Client({
    user: '*****',
    host: '****',
    database: '*****',
    password: '******#',
    port: 5432,
})

client.connect()
    .then(async () => {
        console.log("Connected, processing file");


        return StringStream
            // this creates a "scramjet" stream from input.
            .from(fs.createReadStream("input.txt"))
            // this splits fs line by line
            .lines()
            // the next line is just to show when the file is fully read
            .use(stream => stream.whenEnd.then(() => console.log("done reading file.")))
            // this splits the words like the first "for" loop in your code
            .map(line => line.toLowerCase().replace(/[^0-9a-z ]+/g, '').split(" "))
            // this one gets rid of empty lines (i.e. no words)
            .filter(line => line.length > 0)
            // this splits the words like the first "for" loop in your code
            .map(async words => {
                for (var i = 0; i < words.length; i++) {
                    const callResult = await isKeyPhrase(words.slice(i, i + 3).join(" "));
                    if (callResult) return callResult;
                }
            })
            // this runs the above list of operations to the end and returns a promise.
            .run();
    })
    .then(() => {
        console.log("done processing file.");
        client.end();
    })
    .catch((e) => {
        console.error(e.stack);
    });


async function isKeyPhrase(keyPhraseText) {

    const query = {
        name: 'get-name',
        text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
        values: [keyPhraseText],
        rowMode: 'array'
    };

    const result = await client.query(query);

    if (result.rowCount > 0) {
        console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);
        return true;
    }

    return false;
}

I compacted and optimized your code in some places, but in general this should get you what you want - scramjet adds the asynchronous mode for each operation and will wait until all the operations are ended.

Michał Karpacki
  • 2,588
  • 21
  • 34
  • 1
    Michael, thank you. I will study this. I appreciate you taking the time to create a working example for me. – tequacreek Oct 02 '18 at 21:09
  • No problem. It's a good feeling to know I made something useful. Let me know if you run into problems with the above code... and if it works, I'd be happy if you accepted the answer or voted up, or even both. ;) – Michał Karpacki Oct 02 '18 at 22:20