2

I am trying to run a number of promises in parallels in a batch but it starts the next batch before I have finished processing the first batch the code works when I use simple timeout promises. Can any one help give me a quick bit of help ?

 await asyncFunctionsInBatches.reduce(async (previousBatch, currentBatch, index) => {
    await previousBatch;
    console.time(`batch ${index}`);
    console.log(`Processing batch ${index}...`);
    const currentBatchPromises = currentBatch.map(async (row) =>
      await sqlFetch(queryDate, queryDate1, username, password, row)
    );
    await Promise.all(currentBatchPromises);
    console.timeEnd(`batch ${index}`);
  }, Promise.resolve());
}

This is the promise I am using

const sqlFetch = async (
  queryDate,
  queryDate1,
  username,
  password,
  rowNumber
) => {
  ( await createUnixSocketPool(username, password)).query(
    fetchQuery.sql,
    [queryDate, queryDate1, rowNumber],
    async function (err, result) {
      if (err) {
        console.log(err);
      }
      console.log(result)
      
       return  publishMessage(result).catch(console.error);
       
    }
  );
};
T.J. Crowder
  • 1,031,962
  • 187
  • 1,923
  • 1,875
john
  • 68
  • 6
  • Why use `reduce` for something that's just a loop? – T.J. Crowder Aug 05 '21 at 11:35
  • 1
    *"I am trying to run a number of promises in parallels in a batch but it starts the next batch before I have finished processing the first batch"* That tells us that somewhere, there's a missing `await`, or you're `await`ing something that does asynchronous work but *doesn't* return a promise, or you're passing an `async` function to something that doesn't understand its promise return value. For instance, are you sure that the `query` function you're using in `sqlFetch` returns a promise? And that it expects one from its callback? It's somewhat unusual for it to *also* accept a callback... – T.J. Crowder Aug 05 '21 at 11:38

2 Answers2

3

In sqlFetch, you're awaiting the result of createUnixSocketPool but not the result of query:

const sqlFetch = async (
  queryDate,
  queryDate1,
  username,
  password,
  rowNumber
) => {
  ( await createUnixSocketPool(username, password)).query(
//  −−−−− −−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−  −−−−−−
//    ↑                      ↑                        ↑
//     \−−− only applies to /  not to −−−−−−−−−−−−−−−/
    fetchQuery.sql,
    [queryDate, queryDate1, rowNumber],
    async function (err, result) {
      if (err) {
        console.log(err);
      }
      console.log(result)
      
       return  publishMessage(result).catch(console.error);
       
    }
  );
};

As a result, the promise from sqlFetch is immediately fulfilled with undefined, while the query is still running.

Without knowing what query expects from its callback (you're giving it a promise; does it really expect one?) or whether it returns a promise it's hard to help, other than to say that if it returns a promise, await that, and if it doesn't, handle that (e.g., as described in the answers to this question).


Here's an update based on information gleaned from the answer you posted:

const sqlFetch = (
    queryDate,
    queryDate1,
    username,
    password,
    rowNumber
) => new Promise((resolve, reject) => {
    createUnixSocketPool(username, password)
    .then(pool => {
        pool.query(
            fetchQuery.sql,
            [queryDate, queryDate1, rowNumber],
            (err, result) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(publishMessage(result));
                }
            }
        );    
    })
    .catch(reject);
});

Or better yet, write a promise-enabled wrapper for query as described in the link above:

function asyncQuery(pool, ...args) {
    return new Promise((resolve, reject) => {
        pool.query(...args, (err, result) => {
            if (err) {
                reject(err);
            } else {
                resolve(result);
            }
        });    
    });
}

and then it's really nice and simple:

const sqlFetch = async (
    queryDate,
    queryDate1,
    username,
    password,
    rowNumber
) => {
    const pool = await createUnixSocketPool(username, password)
    const result = await asyncQuery(pool, fetchQuery.sql, [queryDate, queryDate1, rowNumber]);
    return publishMessage(result)
});
T.J. Crowder
  • 1,031,962
  • 187
  • 1,923
  • 1,875
  • 1
    Thank you I wrapped it in a promise and returned a resolved promise it works fine now thank you – john Aug 05 '21 at 12:23
0

I added a promise

const sqlFetch = (
  queryDate,
  queryDate1,
  username,
  password,
  rowNumber
) => new Promise(async (resolve, reject) => {
  (await createUnixSocketPool(username, password)
  .catch(reject)).query(
    fetchQuery.sql,
    [queryDate, queryDate1, rowNumber],
    function (err, result) {
      if (err) reject(err);
      else publishMessage(result).catch(reject).then(resolve);
    }
  );    
});
john
  • 68
  • 6
  • 2
    `else publishMessage(result).catch(reject).then(resolve);` can be just `else resolve(publishMessage(result))`. That will reject your promise if the promise from `publishMessage` rejects. And generally, don't mix `.then`/`.catch` with `async`/`await`, use `try`/`catch`. – T.J. Crowder Aug 05 '21 at 13:19
  • @T.J.Crowder Thanks again :) – john Aug 05 '21 at 13:20
  • I've updated my answer with some ways to do it based on what I infer from the above, namely that `createUnixSocketPool` *does* return a promise and `query` does not. – T.J. Crowder Aug 05 '21 at 13:29