0

I feel like things are running out of order here. I want to establish a connection to a mySQL database. Then I want to read in a file. Row by row I want to grab the name and run a query. I would have assumed that my sqlSelectQuery function, which returns a promise, would have waited for the promise to resolve before moving onto the next row. What am I missing here?

const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')

const connection = mysql.createConnection({
    user: config.user,
    password: config.password,
    database: config.database,
    host: config.host
});

connection.connect((err) => {
    if(err){
      console.log('Error connecting to Db');
      return;
    }
    console.log('Connection established');
  });

fs.createReadStream(path.resolve(__dirname,'data.csv'))
    .pipe(csv.parse({ headers: true }))
    .on('error', error => console.error("error", error))
    .on('data', row => { // need to get this to block
        sqlSelectQuery(row).then(result => console.log("result: ", result))
    })
    .on('end', rowCount => console.log(`Parsed ${rowCount} rows`));




const sqlSelectQuery = (row) => {
    return new Promise((resolve, reject) => {
        console.log("inside promise");
        const selectQuery = 'SELECT * FROM loans where business_name = ?;';
        connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
            let result = {};
            if(err) reject(err);
            if (rows.length === 1){
                let res = rows[0];
                result = {
                    business_name: res.business_name,
                    loan_range: res.loan_range,
                    loan_amount: row.InitialApprovalAmount,
                    count: 1
                };
                resolve(result);
            } else {
                result = {
                    business_name: row.BorrowerName,
                    loan_range: "",
                    loan_amount: "",
                    unique: rows.length
                };
                resolve(result);
            }
        });
    })
}

my console looks like this
inside promise
inside promise  //20 times (I have 20 rows)
Parsed 20 rows
Connection established
result:  {....}
result: {...}....
eurodollars
  • 47
  • 1
  • 8

2 Answers2

1

I found this answer. I need to add a pause and resume nodejs async await inside createReadStream

.on('data', async (row) => { // need to get this to block
        stream.pause();
        await sqlSelectQuery(row).then(result => console.log("result: ", result))
        stream.resume();
    })

The issue now is that my .on('end') runs before the last row.

eurodollars
  • 47
  • 1
  • 8
0

You could add each row to a rowsToProcess array, then, once the file data is read, process each row one by one:

const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')

const connection = mysql.createConnection({
    user: config.user,
    password: config.password,
    database: config.database,
    host: config.host
});

connection.connect((err) => {
    if (err) {
        console.error('Error connecting to Db:', err);
        return;
    } 
    console.log('Connection established');
    const rowsToProcess = [];
    fs.createReadStream(path.resolve(__dirname,'data.csv'))
        .pipe(csv.parse({ headers: true }))
        .on('error', error => console.error("error", error))
        .on('data', row => {
            // Add row to process.
            rowsToProcess.push(row);
        })
        .on('end', async rowCount => { 
            await processRows(rowsToProcess);
            console.log("processRows: complete.")
        })
});

async function processRows(rowsToProcess) {
    console.log(`Read ${rowsToProcess.length} row(s) from csv file...`)
    for (let i = 0; i < rowsToProcess.length; i++) {
        console.log(`processing row ${i+1} of ${rowsToProcess.length}...`);
        let result = await sqlSelectQuery(rowsToProcess[i])
        console.log(`row ${i+1} result:`, result);
    }
}

const sqlSelectQuery = (row) => {
    return new Promise((resolve, reject) => {
        console.log("Processing row:", row);
        const selectQuery = 'SELECT * FROM loans where business_name = ?;';
        connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
            let result = {};
            if(err) reject(err);
            if (rows.length === 1){
                let res = rows[0];
                result = {
                    business_name: res.business_name,
                    loan_range: res.loan_range,
                    loan_amount: row.InitialApprovalAmount,
                    count: 1
                };
                resolve(result);
            } else {
                result = {
                    business_name: row.BorrowerName,
                    loan_range: "",
                    loan_amount: "",
                    unique: rows.length
                };
                resolve(result);
            }
        });
    })
}
Terry Lennox
  • 29,471
  • 5
  • 28
  • 40
  • 1
    Thank you for adding in the connection.end(), I had forgotten about that. The issue I am having now is that my last row shows up after the 'end'. Parsed 20 rows result{....} //this is the final result – eurodollars Mar 10 '21 at 14:24