31

Is there any option to perform bulk upserts with mongoose? So basically having an array and insert each element if it not exists or update it if it exists? (I am using customs _ids)

When I do use .insert MongoDB returns an error E11000 for duplicate keys (which should be updated). Inserting multiple new document works fine though:

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });

Using .save returns an error that the parameter must be a single document:

Users.save(data, function(err){
   ...
}

This answer suggest there is no such option, however it is specific for C# and also already 3 years old. So I was wondering if there is any option to do that using mongoose?

Thank you!

Neil Lunn
  • 148,042
  • 36
  • 346
  • 317
user3122267
  • 327
  • 1
  • 3
  • 7
  • What do you mean by bulk upsert? The update upsert flag if set to true creates a new document if no document was found to update. http://docs.mongodb.org/manual/reference/glossary/#term-upsert – joao Aug 13 '14 at 12:34
  • @joao Possibly referred to in the "Bulk" operations API as referred to in the answer given. – Neil Lunn Aug 13 '14 at 12:52

7 Answers7

23

Not in "mongoose" specifically, or at least not yet as of writing. The MongoDB shell as of the 2.6 release actually uses the "Bulk operations API" "under the hood" as it were for all of the general helper methods. In it's implementation, it tries to do this first, and if an older version server is detected then there is a "fallback" to the legacy implementation.

All of the mongoose methods "currently" use the "legacy" implementation or the write concern response and the basic legacy methods. But there is a .collection accessor from any given mongoose model that essentially accesses the "collection object" from the underlying "node native driver" on which mongoose is implemented itself:

 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });

The main catch there being that "mongoose methods" are actually aware that a connection may not actually be made yet and "queue" until this is complete. The native driver you are "digging into" does not make this distinction.

So you really have to be aware that the connection is established in some way or form. But you can use the native driver methods as long as you are careful with what you are doing.

Neil Lunn
  • 148,042
  • 36
  • 346
  • 317
  • Thanks! This works great. I would have liked joao's approach, but I didn't manage to upload multiple documents with .update() ... Of course I could do it in a for loop, but I guess doing a bulk upload is more efficient? Or is there no difference as the DB connection is open anyways? – user3122267 Aug 13 '14 at 14:52
  • @user3122267 Upsert ant Bulk are basically "chalk and cheese", not the same or even close. An "upsert" creates a new document where one does not exist and and "Bulk" is bulk operations. The other option is "multi", since `.update()` will only modify the "first" found document by default. Like the approach? See the big difference from commentators that know nothing to the people who answer who actually have the knowledge? – Neil Lunn Aug 13 '14 at 15:02
  • @zstew The correct place to ask new questions is asking another question rather than commenting on older posts. What you seem to have missed there is the statements made at the end of this answer. If you still don't understand what that means then ask another question. – Neil Lunn Jan 27 '15 at 02:40
  • I note that both this answer and @konsumer's loop all the records synchronously. I am curious about the performance difference of creating 10 `bulk` operations in one tick, as opposed to creating 10 `bulk` operations in 10 separate ticks (in terms of memory usage in Node). – joeytwiddle Apr 08 '16 at 04:09
  • 2
    @joeytwiddle "Bulk" operations are not async until you call `.execute()`. The purpose is that any "back and forth" to the server is going to cost in IO, so you are trying to minimalize it. True that in a synchronous loop you possibly have `.execute()` happening several times and using several connections. But you can alter that with something like [`async.whilst`](https://github.com/caolan/async#whilst) or other control where the iteration can be controlled by a callback ( and therefore inside `.execute()` ) to handle completion. That's a bit harder to do with promises, but still possible. – Neil Lunn Apr 08 '16 at 04:34
  • I tested both the techniques offered here, and with some help from `ulimit` I found that both can fill up Node's memory, if Node creates documents faster than Mongo can store them (and the network can transfer them). So for huge numbers of documents, we should limit the number of bulk ops being executed at one time, by waiting for completion as you say. Running 4 in parallel might be a good approach. – joeytwiddle Apr 28 '16 at 16:39
19

You don't need to manage limit (1000) as @neil-lunn suggested. Mongoose does this already. I used his great answer as a basis for this complete Promise-based implementation & example:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });

This has the bonus of closing the connection when it's done, displaying any errors if you care, but ignoring them if not (error callbacks in Promises are optional.) It's also very fast. Just leaving this here to share my findings. You can uncomment the eztv stuff if you want to save all eztv shows to a database, as an example.

konsumer
  • 3,411
  • 1
  • 30
  • 31
  • 1
    Wouldn't this consume more memory? – ECMAScript Aug 10 '15 at 18:37
  • consume more memory than what? – konsumer Aug 11 '15 at 22:18
  • Than executing every 1000 operations, you're storing more operations this way, right? – ECMAScript Aug 12 '15 at 13:50
  • No, that is my point. Mongoose does management of bulk operation count, already. You don't need to limit to 1000, because it already limits to whatever your mongo db is set to limit to. – konsumer Aug 12 '15 at 18:01
  • Yes but if you're doing 10,000 operations, you're storing 10,000 rather than manually executing every 1,000 operations. – ECMAScript Aug 12 '15 at 19:11
  • It's exactly the same, because mongoose already does the same limit management, as above. Try it out, and watch memory usage for both. – konsumer Aug 13 '15 at 16:54
  • To clarify, are you saying that if I pass an array of 10,000 records to the `save()` function, that mongoose might actually make 10 separate calls to the server? – joeytwiddle Apr 08 '16 at 04:08
  • 1
    Yep. That's what `bulk.execute` does. https://docs.mongodb.org/v3.0/reference/method/db.collection.initializeUnorderedBulkOp/ – konsumer Apr 08 '16 at 08:09
  • @ECMAScript didn't say whether he was concerned about Node's memory usage or Mongo's memory usage. – joeytwiddle Apr 08 '16 at 16:47
  • Again, try it out and see if it meets your needs, but this is the mongoose-way of doing bulk upserts. – konsumer Apr 09 '16 at 17:46
  • @joeytwiddle I'd still like to know. – ECMAScript Apr 09 '16 at 21:30
  • 2
    @ECMAScript In fact both Neil and konsumer's suggestions consume a similar amount of Node's memory, because both techniques keep creating documents without waiting for Mongo to respond. Obviously this is only a problem if you intend to insert more documents than can fit in your RAM. – joeytwiddle Apr 28 '16 at 16:53
  • @joeytwiddle and what do you think about my solution that I posted in this same answer in terms of performance since I have only one bulk object at a given time – PirateApp Jan 28 '17 at 07:15
  • @konsumer nope not working for me, I have a database at mlabs which is in production, and it keeps timing out when I am trying to insert 10000 items – PirateApp Jan 28 '17 at 07:16
  • 1
    @PirateApp maybe you are running out of memory to hold the structure? What is the error you get? You might have to use serial promises to run them one by one or run chunks of them in bulk, if you don't have the memory to hold it. – konsumer Jan 30 '17 at 18:15
  • @konsumer its an MongodbError timedout, currently i implemented this using a serial promise that runs a batch of 500 and goes to the next one – PirateApp Jan 31 '17 at 03:40
7
await Model.bulkWrite(docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
})))


Or more verbose:

const bulkOps = docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
}))

Model.bulkWrite(bulkOps)
        .then(bulkWriteOpResult => console.log('BULK update OK:', bulkWriteOpResult))
        .catch(err => console.error('BULK update error:', err))

https://stackoverflow.com/a/60330161/5318303

Mir-Ismaili
  • 13,974
  • 8
  • 82
  • 100
5

I have released a plugin for Mongoose that exposes a static upsertMany method to perform bulk upsert operations with a promise interface.

An added benefit of using this plugin over initializing your own bulk op on the underlying collection, is that this plugin converts your data to Mongoose model's first, and then back to plain objects before the upsert. This ensures Mongoose schema validation is applied, and data is depopulated and fit for raw insertion.

https://github.com/meanie/mongoose-upsert-many https://www.npmjs.com/package/@meanie/mongoose-upsert-many

Hope it helps!

Adam Reis
  • 4,165
  • 1
  • 44
  • 35
1

If you're not seeing the bulk methods in your db.collection ie you're getting a error to the effect of xxx variable has no method: initializeOrderedBulkOp()

Try updating your mongoose version. Apparently older mongoose versions don't pass through all of the underlying mongo db.collection methods.

npm install mongoose

took care of it for me.

zstew
  • 1,195
  • 8
  • 11
0

I had to achieve this recently while storing products in my ecommerce app. My database used to timeout as I had to upsert 10000 items every 4 hours. One option for me was to set the socketTimeoutMS and connectTimeoutMS in mongoose while connecting to the database but it sorta felt hacky and I did not want to manipulate connection timeout defaults of the database. I also see that the solution by @neil lunn takes a simple sync approach of taking a modulus inside the for loop. Here is an async version of mine that I believe does the job much better

let BATCH_SIZE = 500
Array.prototype.chunk = function (groupsize) {
    var sets = [];
    var chunks = this.length / groupsize;

    for (var i = 0, j = 0; i < chunks; i++ , j += groupsize) {
        sets[i] = this.slice(j, j + groupsize);
    }

    return sets;
}

function upsertDiscountedProducts(products) {

    //Take the input array of products and divide it into chunks of BATCH_SIZE

    let chunks = products.chunk(BATCH_SIZE), current = 0

    console.log('Number of chunks ', chunks.length)

    let bulk = models.Product.collection.initializeUnorderedBulkOp();

    //Get the current time as timestamp
    let timestamp = new Date(),

        //Keep track of the number of items being looped
        pendingCount = 0,
        inserted = 0,
        upserted = 0,
        matched = 0,
        modified = 0,
        removed = 0,

        //If atleast one upsert was performed
        upsertHappened = false;

    //Call the load function to get started
    load()
    function load() {

        //If we have a chunk to process
        if (current < chunks.length) {
            console.log('Current value ', current)

            for (let i = 0; i < chunks[current].length; i++) {
                //For each item set the updated timestamp to the current time
                let item = chunks[current][i]

                //Set the updated timestamp on each item
                item.updatedAt = timestamp;

                bulk.find({ _id: item._id })
                    .upsert()
                    .updateOne({
                        "$set": item,

                        //If the item is being newly inserted, set a created timestamp on it
                        "$setOnInsert": {
                            "createdAt": timestamp
                        }
                    })
            }

            //Execute the bulk operation for the current chunk
            bulk.execute((error, result) => {
                if (error) {
                    console.error('Error while inserting products' + JSON.stringify(error))
                    next()
                }
                else {

                    //Atleast one upsert has happened
                    upsertHappened = true;
                    inserted += result.nInserted
                    upserted += result.nUpserted
                    matched += result.nMatched
                    modified += result.nModified
                    removed += result.nRemoved

                    //Move to the next chunk
                    next()
                }
            })



        }
        else {
            console.log("Calling finish")
            finish()
        }

    }

    function next() {
        current++;

        //Reassign bulk to a new object and call load once again on the new object after incrementing chunk
        bulk = models.Product.collection.initializeUnorderedBulkOp();
        setTimeout(load, 0)
    }

    function finish() {

        console.log('Inserted ', inserted + ' Upserted ', upserted, ' Matched ', matched, ' Modified ', modified, ' Removed ', removed)

        //If atleast one chunk was inserted, remove all items with a 0% discount or not updated in the latest upsert
        if (upsertHappened) {
            console.log("Calling remove")
            remove()
        }


    }

    /**
     * Remove all the items that were not updated in the recent upsert or those items with a discount of 0
     */
    function remove() {

        models.Product.remove(
            {
                "$or":
                [{
                    "updatedAt": { "$lt": timestamp }
                },
                {
                    "discount": { "$eq": 0 }
                }]
            }, (error, obj) => {
                if (error) {
                    console.log('Error while removing', JSON.stringify(error))
                }
                else {
                    if (obj.result.n === 0) {
                        console.log('Nothing was removed')
                    } else {
                        console.log('Removed ' + obj.result.n + ' documents')
                    }
                }
            }
        )
    }
}
PirateApp
  • 5,433
  • 4
  • 57
  • 90
  • @neil-lunn your solution if I am not mistaken creates multiple bulk objects together and they all execute asynchronously but I made it such that it has only one bulk.execute at a given time in my solution – PirateApp Jan 28 '17 at 07:13
  • 1
    As I understand you are processing batches in serial. I think that is correct to ensure the memory doesn't get overloaded. But with only one batch at a time, sometimes your DB will be waiting for the network, and sometimes the network will be waiting for the CPU. Running 5-10 smaller batches in parallel (with a new batch starting in serial each time an earlier batch completes) may give a small increase in throughput, by ensuring all parts of the system that can be doing work are doing work. – joeytwiddle Jan 31 '17 at 01:52
0

You can use mongoose's Model.bulkWrite()

const res = await Character.bulkWrite([
  {
    updateOne: {
      filter: { name: 'Will Riker' },
      update: { age: 29 },
      upsert: true
    }
  },
  {
    updateOne: {
      filter: { name: 'Geordi La Forge' },
      update: { age: 29 },
      upsert: true
    }
  }
]);

reference : https://masteringjs.io/tutorials/mongoose/upsert