26

I'd like to perform a batch update using Knex.js

For example:

'UPDATE foo SET [theValues] WHERE idFoo = 1'
'UPDATE foo SET [theValues] WHERE idFoo = 2'

with values:

{ name: "FooName1", checked: true } // to `idFoo = 1`
{ name: "FooName2", checked: false } // to `idFoo = 2`

I was using node-mysql previously, which allowed multiple-statements. While using that I simply built a mulitple-statement query string and just send that through the wire in a single run.

I'm not sure how to achieve the same with Knex. I can see batchInsert as an API method I can use, but nothing as far as batchUpdate is concerned.

Note:

  • I can do an async iteration and update each row separately. That's bad cause it means there's gonna be lots of roundtrips from the server to the DB

  • I can use the raw() thing of Knex and probably do something similar to what I do with node-mysql. However that defeats the whole knex purpose of being a DB abstraction layer (It introduces strong DB coupling)

So I'd like to do this using something "knex-y".

Any ideas welcome.

nicholaswmin
  • 21,686
  • 15
  • 91
  • 167
  • 1
    Why not do parallel async operations? It consumes a bit more RAM because you're registering a callback for each update but from the network card's point of view it just looks like one big stream of updates. – slebetman Nov 11 '16 at 09:05
  • @slebetman doesn't sound bad - I'll give it a shot and if it looks good, I'll post an answer – nicholaswmin Nov 11 '16 at 09:26
  • 2
    @slebetman Revisiting this after 1.5 years. Back then, I've went with your solution instead of what I accepted and it worked great. If you ever come around to writing an answer I'll accept yours instead. – nicholaswmin Apr 27 '18 at 13:54

5 Answers5

33

I needed to perform a batch update inside a transaction (I didn't want to have partial updates in case something went wrong). I've resolved it the next way:

// I wrap knex as 'connection'
return connection.transaction(trx => {
    const queries = [];
    users.forEach(user => {
        const query = connection('users')
            .where('id', user.id)
            .update({
                lastActivity: user.lastActivity,
                points: user.points,
            })
            .transacting(trx); // This makes every update be in the same transaction
        queries.push(query);
    });

    Promise.all(queries) // Once every query is written
        .then(trx.commit) // We try to execute all of them
        .catch(trx.rollback); // And rollback in case any of them goes wrong
});
A Bravo Dev
  • 502
  • 4
  • 6
  • 4
    When using mssql, if you try Promise.all(queries), tedious (the NodeJS mssql library) will error out with `Can't acquire connection for the request. There is another request in progress.` and Knex follows up with `Can't rollback transaction. There is a request in progress.`. This is because Promise.all() executes all queries in parallel. To fix this, all promises must be resolved one after another. You can do this with code [in this stackoverflow question](https://stackoverflow.com/questions/24586110/resolve-promises-one-after-another-i-e-in-sequence). – RedShift Jul 01 '19 at 09:06
  • 1
    [Also see this bug report from Tedious](https://github.com/tediousjs/node-mssql/issues/491) – RedShift Jul 01 '19 at 09:07
  • 1
    This answer helped me a lot, but I struggled with data not being written to db; it turns out I needed to replace `.then(trx.commit)` and `.catch(trx.rollback)` with arrow functions: `.then(() => trx.commit)` and `.catch(() => trx.rollback)` – Marius B Jun 08 '20 at 19:23
  • @RedShift, I'm getting Error `Can't acquire connection for the request. There is another request in progress`. Checked the link provided by you. Can you provide simple example. – Himanshu Shekhar Jun 26 '20 at 10:07
  • users is not defined.. where did you get users from? – justdvl Jun 30 '21 at 13:40
  • @justdvl `users` isn't defined, but it would look like a collection of user objects. – jiminikiz Oct 07 '22 at 15:36
  • @RedShift your point about `Promise.all()` executing all queries in parallel seemed to be right for my problem. Solved it by following the thread you linked, thanks! – Nathan Tew Apr 22 '23 at 14:23
28

Assuming you have a collection of valid keys/values for the given table:

// abstract transactional batch update
function batchUpdate(table, collection) {
  return knex.transaction(trx => {
    const queries = collection.map(tuple =>
      knex(table)
        .where('id', tuple.id)
        .update(tuple)
        .transacting(trx)
    );
    return Promise.all(queries)
      .then(trx.commit)    
      .catch(trx.rollback);
  });
}

To call it

batchUpdate('user', [...]);

Are you unfortunately subject to non-conventional column names? No worries, I got you fam:

function batchUpdate(options, collection) {
  return knex.transaction(trx => {
    const queries = collection.map(tuple =>
      knex(options.table)
        .where(options.column, tuple[options.column])
        .update(tuple)
        .transacting(trx)
    );
    return Promise.all(queries)
      .then(trx.commit)    
      .catch(trx.rollback);
  });
}

To call it

batchUpdate({ table: 'user', column: 'user_id' }, [...]);

Modern Syntax Version:

const batchUpdate = async (options, collection) => {
  const { table, column } = options;
  const trx = await knex.transaction(); 
  try {
    await Promise.all(collection.map(tuple => 
      knex(table)
        .where(column, tuple[column])
        .update(tuple)
        .transacting(trx)
      )
    );
    await trx.commit();
  } catch (error) {
    await trx.rollback();
  }
}
jiminikiz
  • 2,867
  • 1
  • 25
  • 28
  • This is a nice solution, just curious, how can I customize this if I want to return all id after commit – Chenhai-胡晨海 Mar 05 '20 at 19:52
  • Added my snippet below to return an array of ID values. Hope it helps! – Ryan Brockhoff May 03 '20 at 18:00
  • Nothing big changes only removing Primary key column from updated column list ```collection.map((tuple) => { const data = { ...tuple }; delete data[column]; return knex(table).where(column, tuple[column]).update(data).transacting(trx); }) ); ``` – Pash Jul 27 '23 at 12:09
20

You have a good idea of the pros and cons of each approach. I would recommend a raw query that bulk updates over several async updates. Yes you can run them in parallel, but your bottleneck becomes the time it takes for the db to run each update. Details can be found here.

Below is an example of an batch upsert using knex.raw. Assume that records is an array of objects (one obj for each row we want to update) whose values are the properties names line up with the columns in the database you want to update:

var knex = require('knex'),
    _ = require('underscore');

function bulkUpdate (records) {
      var updateQuery = [
          'INSERT INTO mytable (primaryKeyCol, col2, colN) VALUES',
          _.map(records, () => '(?)').join(','),
          'ON DUPLICATE KEY UPDATE',
          'col2 = VALUES(col2),',
          'colN = VALUES(colN)'
      ].join(' '),

      vals = [];

      _(records).map(record => {
          vals.push(_(record).values());
      });

      return knex.raw(updateQuery, vals);
 }

This answer does a great job explaining the runtime relationship between the two approaches.

Edit:

It was requested that I show what records would look like in this example.

var records = [
  { primaryKeyCol: 123, col2: 'foo', colN: 'bar' },
  { // some other record, same props }
];

Please note that if your record has additional properties than the ones you specified in the query, you cannot do:

  _(records).map(record => {
      vals.push(_(record).values());
  });

Because you will hand too many values to the query per record and knex will fail to match the property values of each record with the ? characters in the query. You instead will need to explicitly push the values on each record that you want to insert into an array like so:

  // assume a record has additional property `type` that you dont want to
  // insert into the database
  // example: { primaryKeyCol: 123, col2: 'foo', colN: 'bar', type: 'baz' }
  _(records).map(record => {
      vals.push(record.primaryKeyCol);
      vals.push(record.col2);
      vals.push(record.colN);
  });

There are less repetitive ways of doing the above explicit references, but this is just an example. Hope this helps!

Patrick Motard
  • 2,650
  • 2
  • 14
  • 23
  • Hello, can you provide a sample structure of the "records"? just to be sure that I'm doing the correct one thanks! – I am L Jul 05 '17 at 14:49
  • 1
    @IamL Answered your question in an edit. Let me know if that helps you. :) If so, please upvote, if not I can explain further! – Patrick Motard Jul 06 '17 at 15:22
  • what if i've a different example `{ primaryKeyCol: 123, col2: {status: 'available, offline: false, etc .... props'}}` and does primaryKeyCol must be the primary key in my table , can't i update for email or something i know it's unique but it doesn't have the unique constraint. how do i update such a thing – sasha romanov Aug 25 '19 at 21:47
  • @PatrickMotard i get this error when i try to do it with your code `Error: Expected 1191 bindings, saw 397 at replaceRawArrBindings` – sasha romanov Aug 25 '19 at 22:52
  • using insert...on duplicate key update to just do updates this way has the flaw that it will actually insert if you mistakenly include a primary key that doesn't exist. an alternative on mysql 8.0.19+/mariadb 10.3+ is using a values table constructor https://dbfiddle.uk/?rdbms=mysql_8.0&fiddle=b9b320de7516ac3cac1b787c4d314049 – ysth Jul 07 '22 at 18:07
  • This code is unfortunately not transaction safe. I also recommend the native JS `map` function over importing `underscore` for just `_.map`. – jiminikiz Oct 07 '22 at 15:33
  • 1
    Agreed @jiminikiz, I also no longer use underscore. The native JS methods are sufficient now. – Patrick Motard Oct 07 '22 at 15:42
  • 1
    A transaction would be a great addition to this solution. Other answers here illustrate using transactions in the context of promises. Good point @jiminikiz! – Patrick Motard Oct 07 '22 at 15:45
0

The solution works great for me! I just include an ID parameter to make it dynamic across tables with custom ID tags. Chenhai, here's my snippet including a way to return a single array of ID values for the transaction:

function batchUpdate(table, id, collection) {
   return knex.transaction((trx) => {
       const queries = collection.map(async (tuple) => {
       const [tupleId] = await knex(table)
        .where(`${id}`, tuple[id])
        .update(tuple)
        .transacting(trx)
        .returning(id);

       return tupleId;
   });

   return Promise.all(queries).then(trx.commit).catch(trx.rollback);
   });
}

You can use response = await batchUpdate("table_name", "custom_table_id", [array of rows to update]) to get the returned array of IDs.

Ryan Brockhoff
  • 617
  • 1
  • 7
  • 8
  • 3
    You don't need `async` nor `await ` in your `collection.map`. Just return `knex.update`. `Promise.all` takes an array of Promises. – nicholaswmin May 03 '20 at 18:54
-1

The update can be done in batches, i.e 1000 rows in a batch

And as long as it does it in batches, the bluebird map could be used.

For more information on bluebird map: http://bluebirdjs.com/docs/api/promise.map.html

const limit = 1000;
const totalRows = 50000;
const seq = count => Array(Math.ceil(count / limit)).keys();

map(seq(totalRows), page => updateTable(dbTable, page), { concurrency: 1 });

const updateTable = async (dbTable, page) => {
let offset = limit* page;

return knex(dbTable).pluck('id').limit(limit).offset(offset).then(ids => {
    return knex(dbTable)
        .whereIn('id', ids)
        .update({ date: new Date() })
        .then((rows) => {
            console.log(`${page} - Updated rows of the table ${dbTable} from ${offset} to ${offset + batch}: `, rows);
        })
        .catch((err) => {
            console.log({ err });
        });
})
.catch((err) => {
       console.log({ err });
 });
};

Where pluck() is used to get ids in array form

  • You're swallowing errors. Don't handle `catch`, otherwise your script will keep going if updates failed. Your outer `.catch` won't trigger either since your inner `.catch` is console.logging and not throwing errors. – nicholaswmin Oct 25 '22 at 10:01
  • The outer `catch` is for `pluck('id').limit(limit).offset(offset)` and the inner `catch` is for `update()`. Thanks – Shayan Shaikh Oct 26 '22 at 07:40
  • I understand that, what I'm saying is that you're dangerously handling errors. – nicholaswmin Oct 26 '22 at 12:08