4

I have 2 collections:

  • Clients (6 000 000 documents)
  • Orders (50 000 000 documents)

Once a day, i would like to calculate the number of orders in the past year, past month and past week, and such, by client.

I tried this:

db.orders.aggregate(
    {$match: 
        { date_order: { $gt: v_date1year } }
    },
    {$group : {
        _id : "$id_client", 
        count : {$sum : 1}
    }} ,
    {
        "$out": "tmp_indicators"
    }
)

db.tmp_indicators.find({}).forEach(function (my_client) { 
    db.clients.update (
        {"id_client": my_client._id},
        {"$set": 
            { "nb_orders_1year" : my_client.count }
        }
    )
})

I have to do this 3 times, 1 for the past year aggregation, 1 for the past month and 1 for the past week. The treatement is very slow, do you have an idea of how to perform it in a better way?

chridam
  • 100,957
  • 23
  • 236
  • 235
Mouette
  • 289
  • 1
  • 6
  • 17

1 Answers1

11

For improved performance especially when dealing with large collections, take advantage of using the Bulk() API for bulk updates as you will be sending the operations to the server in batches (for example, say a batch size of 1000) which gives you much better performance since you won't be sending every request to the server (as you are currently doing with the update statement within the forEach() loop) but just once in every 1000 requests, thus making your updates more efficient and quicker than currently is.

The following examples demonstrate this approach, the first one uses the Bulk() API available in MongoDB versions >= 2.6 and < 3.2. It updates all the documents in the clients collection by changing the nb_orders_1year fields with values from the aggregation results.

Since the aggregate() method returns a cursor, You can use the aggregation output collection's forEach() method to iterate it and access each document thus setting up the bulk update operations in batches to then send across the server efficiently with the API:

var bulk = db.clients.initializeUnorderedBulkOp(),
    pipeline = [
        {
            "$match": { "date_order": { "$gt": v_date1year } }
        },
        {
            "$group": {
                "_id": "$id_client", 
                "count": { "$sum" : 1 }
            }
        },
        { "$out": "tmp_indicators" }        
    ],
    counter = 0;

db.orders.aggregate(pipeline);  
db.tmp_indicators.find().forEach(function (doc) {       
    bulk.find({ "_id": doc._id }).updateOne({ 
        "$set": { "nb_orders_1year": doc.count }
    });

    counter++;
    if (counter % 1000 == 0) {
        bulk.execute(); // Execute per 1000 operations and re-initialize every 1000 update statements
        bulk = db.clients.initializeUnorderedBulkOp();
    }
});
// Clean up remaining operations in queue
if (counter % 1000 != 0) { bulk.execute(); }

The next example applies to the new MongoDB version 3.2 which has since deprecated the Bulk API and provided a newer set of apis using bulkWrite().

It uses the same cursor as above but instead of iterating the result, create the array with the bulk operations by using its map() method:

 var pipeline = [
        {
            "$match": { "date_order": { "$gt": v_date1year } }
        },
        {
            "$group": {
                "_id": "$id_client", 
                "count": { "$sum" : 1 }
            }
        },
        { "$out": "tmp_indicators" }        
    ];
db.orders.aggregate(pipeline);
var bulkOps = db.tmp_indicators.find().map(function (doc) { 
        return { 
            "updateOne": { 
                "filter": { "_id": doc._id } ,              
                "update": { "$set": { "nb_orders_1year": doc.count } } 
            }         
        };
    });

db.clients.bulkWrite(bulkOps, { "ordered": true });
chridam
  • 100,957
  • 23
  • 236
  • 235
  • Thanks for the solution but using a cursor doesn't work because of the size limitation of the document. I have this issue "exception: aggregation result exceeds maximum document size (16MB)". This is why I used the "$out" utility – Mouette Feb 08 '16 at 08:43
  • 1
    @Mouette You are right, I had not factored that in my initial response but have since updated my answer to use the aggregation output collection's cursor. – chridam Feb 08 '16 at 09:04
  • If the first solution, what is the aim of executing the bulk every 1000 operations? Can't the bulk utility handle 6 000 000 updates (1 per client max) in one shot ? – Mouette Feb 08 '16 at 13:59
  • 1
    With such humongous batch size you are definitely bound to get the 16BM BSON limit constraint, hence choosing a smaller manageable batch size. The default is 1000 (max). – chridam Feb 08 '16 at 14:02