2

I'm running a knex seed in Node and need to batch an additional query to my database due to restrictions on my server. I'm starting to get the hang of promises and async/await, but I'm having trouble getting it to work at several levels deep (what's throwing me off in particular at this point is that it seems to interfere with the batching in a way that I can't quite make sense of). My seed file looks like this:

exports.seed = async function(knex) {
  const fs = require('fs');
  const _ = require('lodash');

  function get_event_id(location) {
    return knex('events')
      .where({location: location})
      .first()
      .then(result => { return result['id']; })
      .finally(() => { knex.destroy() })
  }

  function createImage(row, event_id) {
    return {
      name: row[4],
      event_id: event_id
    }
  };

  async function run_query(line) {
      let row = line.split(',');
      let event_id = await get_event_id(row[0]);
      return createImage(row, event_id);
  };

  async function run_batch(batch) {

      return Promise.all(batch.map(run_query));
  } 

  const file = fs.readFileSync('./data.csv');
  const lines = file.toString().replace(/[\r]/g, '').split('\n').slice(1,60); // skip csv header, then run first 59 lines

  const batches = _.chunk(lines, 30); // set batch size

  let images = await Promise.all(batches.map(run_batch));

  console.log(_.flatten(images).length);

};

My database can handle 30 queries at a time. Everything resolves properly if I run a single batch using .slice(1,30) on the line where lines is defined. But running with 60 as above gives me ER_TOO_MANY_USER_CONNECTIONS: User already has more than 'max_user_connections' active connections.

The script completes if I change the content of run_batch to return batch.map(run_query), and it returns the correct number of entries (so it seems to be batching properly). But then the Promises are still pending. What am I missing, and is there a more elegant way to do this?

user6647072
  • 131
  • 2
  • 17

1 Answers1

7

In this line:

let images = await Promise.all(batches.map(run_batch));

You are trying to run ALL the batches in parallel which is defeating your chunking entirely.

You could use a regular for loop with await instead of the .map() so you runva batch, wait for it to finish, then run the next batch.

let allResults = [];
for (let batch of batches) {
     let images = await run_batch(batch);
     allResults.push(...images);
}
console.log(allResults);

FYI, you might benefit from any number of functions people have written for processing a large array with no more than N requests in flight at the same time. These do not require you to manually break the data into batches. Instead, they monitor how many requests are in-flight at the same time and they start up your desired number of requests and as one finishes, they start another one, collecting the results for you.

runN(fn, limit, cnt, options): Loop through an API on multiple requests

pMap(array, fn, limit): Make several requests to an api that can only handle 20 at a time

rateLimitMap(array, requestsPerSec, maxInFlight, fn): Proper async method for max requests per second

mapConcurrent(array, maxConcurrent, fn): Promise.all() consumes all my ram

There are also features to do this built into the Bluebird promise library and the Async-promises library.

jfriend00
  • 683,504
  • 96
  • 985
  • 979
  • Thanks, this is very helpful (I'm just trying to understand the basic patterns at this point, so I want to avoid extra libraries where possible). Your answer makes sense, and seems to make some progress, but now I get `Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?` when I run more than a single batch. I would have thought the `knex.destroy()` would take care of that... is there a way I can reset the pool after each batch? – user6647072 Jan 29 '20 at 22:44
  • @user6647072 - I'm not very familiar with knex, but the little I've read sounds like `.destroy()` is killing your connection pool which is likely not what you want. If you don't figure this out easily, then perhaps start a new question that focuses on the knex part of the question. – jfriend00 Jan 29 '20 at 22:50
  • 1
    @user6647072 - See [Where to destroy Knex connection](https://stackoverflow.com/questions/54999115/where-to-destroy-knex-connection). It looks like you just want to remove that. It's destroying the connection pool and you will have to create a new pool after calling `.destroy()`. I think you can just safely remove it from this part of your code. After a query finishes, the connection goes back in the connection pool automatically. You can control the connection pool size where you initalize your knex instance. – jfriend00 Jan 29 '20 at 22:54