-1

await is not blocking as expected, when a block of code updates db (using postgres / node )

https://node-postgres.com

I have a list of async function calls, each call udpates a database, and each subsequent call works on data updated by the previous call.

There are about eight calls in a row, and each call must update the complete set of data it is working with, 100% to completion, before going to the next.

I tried to make everything not async, but it appears I am forced to make everything async/await because of the library I am using (postgres / node).


Each function call must complete 100% before going on to the next function call, because the next step does a select on rows where a field is not null (where the previous step fills in a value).

I have an await in front of each call, that does something (see code below):

  1. loads the db from a csv,
  2. next step selects all rows just inserted, calls an API and updates the database,
  3. and so on,

but at one point, when the next function executes, NONE of the rows have been updated (as I trace through and verify, a SQL statement returns nothing back),

the code seems to pass right through going to the second function call, not blocking, honoring the await, and completing it's code block.

If I comment out some of the latter rows (dependent on the previous), and let the program run to completion, the database gets updated.

There is nothing functionally wrong with the code, everything works, just not from beginning to completion.

After running two function calls at the beginning, letting that run, I can then comment out those rows, uncomment the later rows in the flow, and run again, and everything works as expected, but I cannot run to completion with both uncommented.

What can I do to make sure each function call completes 100%, has all updates completed in the database, before going to the next step?

async/await is not working for me.

this is not pseudo-code it's the actual code, that is executing, that I am working with, the function names changed only. It is real working code, cut-n-pasted direct from my IDE.

// these are functions I call below (each in their own .js)
const insert_rows_to_db_from_csv = require('./insert_rows_to_db_from_csv')
const call_api_using_rows_from_function_above = require('./call_api_using_rows_from_function_above')
const and_so_on = require('./and_so_on')
const and_so_on_and_on = require('./and_so_on_and_on')
const and_so_on_and_on_and_on = require('./and_so_on_and_on_and_on')

// each of the above exports a main() function where I can call func.main() just // like this one defined below (this is my main() entry point)

module.exports = {
    main: async function (csvFilePath) {
        console.log('service: upload.main()')
        try {
            const csvList = []

            let rstream = fs.createReadStream(csvFilePath)
                .pipe(csv())
                .on('data', (data) => csvList.push(data))
                .on('end', async () => {
                    let num_rows = csvList.length

                    //step one (if I run these two, with step two calls below commented out, this works)
                    await insert_rows_to_db_from_csv.main(csvList);
                    await call_api_using_rows_from_function_above.main();

                    // step two
                    // blows up here, on the next function call,
                    // no rows selected in sql statements, must comment out, let the above run to
                    // completion, then comment out the rows above, and let these run separate
                    await work_with_rows_updated_in_previous_call_above.main();   // sets
                    await and_so_on.main();
                    await and_so_on_and_on.main();
                    await and_so_on_and_on_and_on.main();
                })
        } catch (err) {
            console.log(err.stack)
        } finally {
        }
    }
};

here is the one liner I am using to call the insert/update to the DB:

 return await pool.query(sql, values);

that's it, nothing more. This is from using: https://node-postgres.com/

npm install pg


PART 2 - continuing on,

I think the problem might be here. This is where I am doing each API call, then insert (that the next function call is dependent upon), some code smell here that I can't sort out.

processBatch(batch) is called, that calls the API, gets a response back, and then within there it calls `handleResponseDetail(response), where the insert is happening. I think the problem is here, if there are any ideas?

this is a code block inside: await call_api_using_rows_from_function_above.main();

It completes with no errors, inserts rows, and commits, then the next function is called, and this next function finds no rows (inserted here). But the await on the entire main() .js blocks and waits, so I don't understand.

/**
 * API call, and within call handleResponse which does the DB insert.
 * @param batch
 * @returns {Promise<*>}
 */
async function processBatch(batch) {
    console.log('Processing batch');
    return await client.send(batch).then(res => {
        return handleResponseDetail(res);
    }).catch(err => handleError(err));
}

// should this be async?
function handleResponseDetail(response) {

    response.lookups.forEach(async function (lookup) {

        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup);
        }
    });
}
Bergi
  • 630,263
  • 148
  • 957
  • 1,375
user10664542
  • 1,106
  • 1
  • 23
  • 43
  • What is `cvsList`? – jfriend00 Dec 02 '19 at 19:01
  • Please show actual and real database calls, not your pseudo code. We can only compare what you're doing to the actual database API if you show us the actual and REAL code. You have to use `await` properly for it to do what you want and we cannot see if you're using it correctly without seeing REAL code. I have no idea why people here think pseudo-code makes a better question than real code. It does NOT. Real code always makes a much better question where we are much more likely to be able to see what you're doing wrong and/or offer you the best solutions. – jfriend00 Dec 02 '19 at 19:02
  • Without seeing your real database code, the only possible answer is to ***"use `await` correctly with your database API"***. But, since you don't show us the actual code, we have no idea where you are using the combination of `await` and the database API incorrectly. – jfriend00 Dec 02 '19 at 19:07
  • We can only guess as to how you might be using the `pg` package wrong given that these aren't actual function calls. If the actual code is far too verbose, you can just snip the SQL and leave the rest - the important part is which node-postgres functions you're using and how. – Klaycon Dec 02 '19 at 19:09
  • A few other comments here. Your `try/catch()` is only going to catch an error from `fs.createReadStream()` and nothing else. You will need `error` handlers on your stream and `try/catch` around your `await` statements for full error handling. Also, a caller of your `async main()` function won't have any idea when it completes. – jfriend00 Dec 02 '19 at 19:09
  • cvsList is a list of rows read in from a csv. – user10664542 Dec 02 '19 at 19:10
  • The code posted is real code, as real as it gets, copied directly from working code. Only the function call names have been changed. Here is the one liner I am using to update the DB with, as requested: return await pool.query(sql, values); – user10664542 Dec 02 '19 at 19:10
  • Please EDIT your question to show the REAL database code you're using. You need to clarify the question, not just put single db statements into comments. The question itself needs to be clear. – jfriend00 Dec 02 '19 at 19:11
  • What's all the `.main()` stuff in your pseudo-code? I don't see any of that in a promise interface to postgres. – jfriend00 Dec 02 '19 at 19:15
  • I updated the code above with two line changes that should provide everything requested in all of the above. The question itself is crystal clear, and there is enough information provided there now to answer the question, if you can, thank you. There is no additional information I can provide, that is everything. The one liner to execute sql statements is provided, and that is the line I am calling in every function. – user10664542 Dec 02 '19 at 19:16
  • With any answer, if you can please do more than just a drive by, complete code examples implementing your suggestion would be greatly appreciated. There are many answers and comments and concerns outside of the scope of the question about a number of things. I am simply trying to solve a very narrow focused problem, the rest is appreciated and will be taken into consideration, but does not solve the direct problem posted, thank you. – user10664542 Dec 02 '19 at 19:19
  • What does the code snippet you provided have to do with `await pool.query(sql, values);`? At this point... they appear to be unrelated. – Kevin B Dec 02 '19 at 19:33
  • 1
    `main: async function (csvFilePath) {` doesn't await anything, nor should it, because nothing inside it can be awaited. a lot of the code in question doesn't make much sense. – Kevin B Dec 02 '19 at 19:34
  • 1
    Without seeing how all of these mysterious `call_api_using_rows_from_function_above.main();` functions are implemented, we can only wildly stab at what the problem might be. My guess is that one or more of your functions is not marked async/does not await other asynchronous operations, thus resulting in one or more of the queries to be scheduled later in the event loop than the rest of your code. – Klaycon Dec 02 '19 at 19:34
  • Btw, there's no point in using a streaming csv parser if all you do is collect the results in an array before further processing. – Bergi Dec 02 '19 at 23:10

2 Answers2

1

Given the code block from your Part 2 edit, the problem is now clear: all of your insert()s are being scheduled outside of the blocking context of the rest of your async/await code! This is because of that .forEach, see this question for more details.

I've annotated your existing code to show the issue:

function handleResponseDetail(response) { //synchronous function

    response.lookups.forEach(async function (lookup) { //asynchronous function
        //these async functions all get scheduled simultaneously
        //without waiting for the previous one to complete - that's why you can't use forEach like this
        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup); //this ONLY blocks the inner async function, not the outer `handleResponseDetail`
        }
    });
}

Here is a fixed version of that function which should work as you expect:

async function handleResponseDetail(response) {

    for(const lookup of response.lookups) {

        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup); //blocks handleResponseDetail until done
        }
    }
}

Alternatively, if the order of insertion doesn't matter, you can use Promise.all for efficiency:

async function handleResponseDetail(response) {

    await Promise.all(response.lookups.map(async lookup => {

        if (typeof lookup.result[0] == "undefined") {   // result[0] is Candidate #0
            ++lookup_fail;
            console.log('No response from API for this address.')
        } else {
            ++lookup_success;

            const id = await insert(lookup);
        }
    })); //waits until all insertions have completed before returning
}

To reiterate, you cannot easily use .forEach() with async/await because .forEach() simply calls the given function for each element of the array synchronously, with no regard for awaiting each promise before calling the next. If you need the loop to block between each element, or to wait for all elements to complete processing before returning from the function (this is your use case), you need to use a different for loop or alternatively a Promise.all() as above.

Klaycon
  • 10,599
  • 18
  • 35
  • OK, making those changes, based on the last response resolved the problem (using the alternate looping mechanism), I will test/explore the second solution, Promise.all() as well. All responses were helpful to fix and refine the code, and I learned a few things. This really saved me a lot of time, and I was able to move on to other things to meet a deadline, and would like to send a cash reward for your thorough response (@Klaycon), sometime before Christmas (any Crypto that you accept), or by any other means. Thank you. – user10664542 Dec 03 '19 at 01:01
0

What your main function currently does is merely creating stream, assigning listeners and instantly returning. It does not await for all the listeners to resolve like you are trying to have it do

You need to extract your file reading logic to another function, which will return a Promise that will resolve only when the entire file is read, then await for that Promise inside main

function getCsvList(csvFilePath) {
  return new Promise((resolve, reject) => {
    const csvList = []
    fs.createReadStream(csvFilePath)
      .pipe(csv())
      .on('data', (data) => csvList.push(data))
      .on('end', () => {
        resolve(csvList)
      })
      .on('error', (e) => reject(e))
  })
}

module.exports = {
  main: async function (csvFilePath) {
    try {
      const csvList = await getCsvList(csvFilePath)
      await insert_rows_to_db_from_csv.main(csvList);
      await call_api_using_rows_from_function_above.main();
      await work_with_rows_updated_in_previous_call_above.main();
      await and_so_on.main();
      await and_so_on_and_on.main();
      await and_so_on_and_on_and_on.main();
    } catch (err) {
      console.log(err.stack)
    } finally {
    }
  }
};
Max
  • 4,473
  • 1
  • 16
  • 18
  • This is a much better design, cleaner. Thank you. I have updated my code to reflect the example you have given, and I believe this fixed one problem, it does block before going to the next step,but it still fails in a function call dependent on a previous call to complete inserting rows and commit. – user10664542 Dec 02 '19 at 22:04
  • In `step one` in my example above, after reading a csv, inserting to the DB (success there), I iterate through those rows, call a 3rd party API, and populate another database table from the response with new rows, and that code executes, and inserts new rows (no failures), and that step blocks and completes, I traced through the try/finally{} to verify, and I am issuing an explicit commit in the finally{}, but the new rows inserted are not visible to the next step in the process. And also, Postgres by default is explicit autocommit, unless turned off, so my commit is redundant. – user10664542 Dec 02 '19 at 22:06
  • @user10664542 It seems more likely given the information about a 3rd party API that at some point you've got a loose callback/promise escaping from the "synchronous" context of async/await. By that I mean a typical node-style callback not wrapped in a promise or a promise not awaited. This would cause a potential race condition leading to out-of-order queries, causing effects identical to the ones you're observing – Klaycon Dec 02 '19 at 22:36
  • I updated another code block, where I think the problem is (something amiss there, I feel - but not sure what ), in section beginning *PART 2* above. I feel I'm close to the problem there. – user10664542 Dec 02 '19 at 22:43