Base on @François Dispaux answer, I have improved the bulkUpsert function.
This should work with Sequelize and Postgres.
Notes: I have got some lines of code from source of sequelize library.
// Version: 6.17.0
// yarn add sequelize@6.17.0
//
const _ = require('lodash');
const { Sequelize, Model, Utils, QueryTypes, QueryError } = require('sequelize');
// --------------------------------------------------------------
// --------------------------------------------------------------
const __defProp = Object.defineProperty;
const __defProps = Object.defineProperties;
const __getOwnPropDescs = Object.getOwnPropertyDescriptors;
const __getOwnPropSymbols = Object.getOwnPropertySymbols;
const __hasOwnProp = Object.prototype.hasOwnProperty;
const __propIsEnum = Object.prototype.propertyIsEnumerable;
const __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value;
const __spreadValues = (a, b) => {
for (let prop in b || (b = {}))
if (__hasOwnProp.call(b, prop))
__defNormalProp(a, prop, b[prop]);
if (__getOwnPropSymbols)
for (let prop of __getOwnPropSymbols(b)) {
if (__propIsEnum.call(b, prop))
__defNormalProp(a, prop, b[prop]);
}
return a;
};
const __spreadProps = (a, b) => __defProps(a, __getOwnPropDescs(b));
// --------------------------------------------------------------
// --------------------------------------------------------------
/**
*
* @param {Model} model Instance of Sequelize model
* @param {Object} options Similar to options of findAll function.
* @param {Boolean} removeSemicolon to remove the semicolon at the end of query. It is useful when using to build query for UNION ALL
* @returns {String} SQL SELECT query
*/
async function buildFindAllSQL(model, options, { removeSemicolon = false }) {
if (options !== void 0 && !_.isPlainObject(options)) {
throw new QueryError("The argument passed to findAll must be an options object, use findByPk if you wish to pass a single primary key value");
}
if (options !== void 0 && options.attributes) {
if (!Array.isArray(options.attributes) && !_.isPlainObject(options.attributes)) {
throw new QueryError("The attributes option must be an array of column names or an object");
}
}
model.warnOnInvalidOptions(options, Object.keys(model.rawAttributes));
const tableNames = {};
tableNames[model.getTableName(options)] = true;
options = Utils.cloneDeep(options);
_.defaults(options, { hooks: true });
options.rejectOnEmpty = Object.prototype.hasOwnProperty.call(options, "rejectOnEmpty") ? options.rejectOnEmpty : model.options.rejectOnEmpty;
model._injectScope(options);
if (options.hooks) {
await model.runHooks("beforeFind", options);
}
model._conformIncludes(options, model);
model._expandAttributes(options);
model._expandIncludeAll(options);
if (options.hooks) {
await model.runHooks("beforeFindAfterExpandIncludeAll", options);
}
options.originalAttributes = model._injectDependentVirtualAttributes(options.attributes);
if (options.include) {
options.hasJoin = true;
model._validateIncludedElements(options, tableNames);
if (options.attributes && !options.raw && model.primaryKeyAttribute && !options.attributes.includes(model.primaryKeyAttribute) && (!options.group || !options.hasSingleAssociation || options.hasMultiAssociation)) {
options.attributes = [model.primaryKeyAttribute].concat(options.attributes);
}
}
if (!options.attributes) {
options.attributes = Object.keys(model.rawAttributes);
options.originalAttributes = model._injectDependentVirtualAttributes(options.attributes);
}
model.options.whereCollection = options.where || null;
Utils.mapFinderOptions(options, model);
options = model._paranoidClause(model, options);
if (options.hooks) {
await model.runHooks("beforeFindAfterOptions", options);
}
const selectOptions = __spreadProps(__spreadValues({}, options), { tableNames: Object.keys(tableNames) });
// This function based-on the code from findAll function of the Model class.
// In the findAll function, the model.queryInterface.select function will be called.
// Inside the select function of the QueryInterface class will define the way to build SELECT query.
const sql = model.sequelize.queryInterface.queryGenerator.selectQuery(model.getTableName(selectOptions), { ...selectOptions, type: QueryTypes.SELECT, model }, model);
if (removeSemicolon) {
const lastChar = sql.slice(sql.length - 1);
if (lastChar === ';') {
return sql.slice(0, -1)
}
}
return sql;
}
/**
*
* @param {Array<Object>} items List data object need to be parsed / mapped.
* @param {Model} model Instance of Sequelize model
* @param {Array<String>} fields List of columns' name
* @returns {Array<Object>}
*/
function mapValues(items, { model, fields }) {
const records = _.cloneDeep(items);
//
const fieldMappedAttributes = {};
for (const attr in model.tableAttributes) {
fieldMappedAttributes[model.rawAttributes[attr].field || attr] = model.rawAttributes[attr];
}
//
const fieldValueHashes = records.map(values => {
const out = Utils.mapValueFieldNames(values, fields, model);
for (const key of model._virtualAttributes) {
delete out[key];
}
return out;
});
//
const tuples = []
for (const fieldValueHash of fieldValueHashes) {
const values = fields.map(key => {
return model.sequelize.queryInterface.queryGenerator.escape(fieldValueHash[key], fieldMappedAttributes[key], { context: 'INSERT' });
});
tuples.push(`(${values.join(',')})`);
}
//
return tuples;
}
/**
*
* @param {Array<Object>} items List data object need to be inserted / updated
* @param {Model} model Instance of Sequelize model
* @returns {String} SQL INSERT query
*/
async function buildBulkUpsertSQL(items = [], {
model,
conflictKeys = [],
excludeFromUpdate = [],
conflictWhere = [],
returning = false,
logging = false,
}) {
if (!items.length) {
return null;
}
const { tableName, sequelize } = model;
const sample = items[0];
const fields = Object.keys(sample);
const createFields = `("${fields.join(`","`)}")`;
const updateFields = fields
.filter((field) => ![...excludeFromUpdate, ...conflictKeys].includes(field))
.map((field) => `"${field}"=EXCLUDED.${field}`)
.join(', ');
//
const tuples = mapValues(_.cloneDeep(items), { model, fields });
const values = tuples.join(',');
//
const onConflict = `ON CONFLICT ("${conflictKeys.join(`","`)}")`;
const returningFields = `"${fields.join('","')}"`;
// const updateWhere = Object.keys(conflictWhere).length > 0 ? `WHERE ${Object.keys(conflictWhere).map(key => `"${tableName}"."${key}" ${conflictWhere[key]}`).join(',')}` : '';
const updateWhere = conflictWhere.length > 0 ? `WHERE ${conflictWhere.join(',')}` : '';
let query = `INSERT INTO "${tableName}" ${createFields} VALUES ${values}`;
if (conflictKeys.length > 0) {
query = `${query} ${onConflict} DO UPDATE SET ${updateFields} ${updateWhere}`;
}
if (returning === true) {
query = `${query} RETURNING ${returningFields}`;
}
query += ';';
if (typeof logging === 'function') {
logging('---------------------------------------');
logging(query);
logging('---------------------------------------');
}
return query;
}
/**
*
* @param {Array<Object>} items List data object need to be inserted / updated
* @param {Model} model Instance of Sequelize model
* @returns {Array} Result of sequelize.query function
*/
async function bulkUpsert(items = [], {
model,
conflictKeys = [],
excludeFromUpdate = [],
conflictWhere = [],
transaction = null,
logging = false
}) {
if (!items.length) {
return [0, 0];
}
const query = await buildBulkUpsertSQL(items, { model, conflictKeys, excludeFromUpdate, conflictWhere, logging });
if (!query) {
return [0, 0];
}
const { sequelize } = model;
const options = {
type: sequelize.QueryTypes.INSERT,
// logging,
};
if (transaction) {
options[transaction] = transaction;
}
return sequelize.query(query, options);
}
// --------------------------------------------------------------
module.exports = {
buildFindAllSQL,
buildBulkUpsertSQL,
bulkUpsert,
mapValues,
};