226

I'm creating a sort of background job queue system with MongoDB as the data store. How can I "listen" for inserts to a MongoDB collection before spawning workers to process the job?

Do I need to poll every few seconds to see if there are any changes from last time, or is there a way my script can wait for inserts to occur?

This is a PHP project that I am working on, but feel free to answer in Ruby or language agnostic.

Sunil Garg
  • 14,608
  • 25
  • 132
  • 189
Andrew
  • 227,796
  • 193
  • 515
  • 708
  • 4
    Change Streams was added in MongoDB 3.6 to address your scenario. https://docs.mongodb.com/manual/changeStreams/ Also if you are using MongoDB Atlas you can leverage Stitch Triggers which allow you to execute functions in response to insert/update/deletes/etc. https://docs.mongodb.com/stitch/triggers/overview/ No more needing to parse the oplog. – Robert Walters Oct 04 '18 at 12:06

12 Answers12

118

What you are thinking of sounds a lot like triggers. MongoDB does not have any support for triggers, however some people have "rolled their own" using some tricks. The key here is the oplog.

When you run MongoDB in a Replica Set, all of the MongoDB actions are logged to an operations log (known as the oplog). The oplog is basically just a running list of the modifications made to the data. Replicas Sets function by listening to changes on this oplog and then applying the changes locally.

Does this sound familiar?

I cannot detail the whole process here, it is several pages of documentation, but the tools you need are available.

First some write-ups on the oplog - Brief description - Layout of the local collection (which contains the oplog)

You will also want to leverage tailable cursors. These will provide you with a way to listen for changes instead of polling for them. Note that replication uses tailable cursors, so this is a supported feature.

jtlindsey
  • 4,346
  • 4
  • 45
  • 73
Gates VP
  • 44,957
  • 11
  • 105
  • 108
  • 1
    hmm...not exactly what I had in mind. I am only running one instance at this point (no slaves). So maybe a more basic solution? – Andrew Mar 13 '12 at 22:18
  • 20
    You can start the server with the `--replSet` option and it will create / populate the `oplog`. Even without the secondary. This is definitely the only way to "listen" to changes in the DB. – Gates VP Mar 13 '12 at 22:36
  • 2
    This is a nice description how to setup oplog for logging changes to DB locally: http://loosexaml.wordpress.com/2012/09/03/how-to-get-a-mongodb-oplog-without-a-full-replica-set/ – johndodo Dec 30 '14 at 14:35
  • Cooooool! That's really what I want. And I found a library named 'mongo-oplog' on npm. So happy~ – pjincz Dec 11 '16 at 20:14
  • I agree by the time of writing this answer triggers might not be available but to all who land here, There is an option available now, Check out MongoDB Stitch (https://docs.mongodb.com/stitch/#stitch) & Stitch triggers (https://docs.mongodb.com/stitch/triggers/).. – whoami - fakeFaceTrueSoul May 11 '20 at 02:06
  • this answer is outdated. Mongodb Realm supports triggers – ChatGPT Aug 18 '20 at 07:33
106

MongoDB has what is called capped collections and tailable cursors that allows MongoDB to push data to the listeners.

A capped collection is essentially a collection that is a fixed size and only allows insertions. Here's what it would look like to create one:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable cursors (original post by Jonathan H. Wage)

Ruby

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python (by Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (by Max)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Additional Resources:

Ruby/Node.js Tutorial which walks you through creating an application that listens to inserts in a MongoDB capped collection.

An article talking about tailable cursors in more detail.

PHP, Ruby, Python, and Perl examples of using tailable cursors.

iamktothed
  • 1,398
  • 2
  • 14
  • 22
Andrew
  • 227,796
  • 193
  • 515
  • 708
  • 78
    sleep 1? really? for production code? how is that not polling? – rbp Sep 13 '13 at 12:36
  • 2
    @rbp haha, I never said it was production code, but you're right, sleeping for a second is not a good practice. Pretty sure I got that example from somewhere else. Not sure how to refactor it though. – Andrew Sep 13 '13 at 16:16
  • 2
    lol, he was just showing tailable cursors! he did his job, why bother with sleep 1!! is by far the most irrelevant thing on this post!! was a great answer! – kroe Dec 22 '14 at 06:19
  • 14
    @kroe because those irrelevant details will get put into production code by newer programmers that may not understand why it's bad. – Catfish Jan 14 '15 at 17:35
  • 4
    I understand your point, but expecting some new programmers to add "sleep 1" to production is almost offensive! I mean, i wouldn't be surprised... But if someone puts this in production, at least will learn the hard way and forever.. hahaha – kroe Jan 14 '15 at 18:22
  • 25
    what's wrong with doing time.sleep(1) in production? – Al Johri Apr 30 '16 at 00:19
  • 1
    for pymongo with mongodb v 3.2, cursor changes to " cursor = coll.find (cursor_type=cursor.CursorType.TAILABLE_AWAIT) " – Aruldd Jul 13 '16 at 10:40
  • 2
    for ruby, with the new (2.3) driver: `collection.find({query}, {tailable: true, await_data: true}).each {|document| ...}` Also check http://shtylman.com/post/the-tail-of-mongodb/ where you can learn that you need an initial data in the cursor to start tailing – Jaffa Aug 25 '16 at 12:59
  • 1
    sleep(1) is fine. – justin.m.chase Oct 04 '18 at 16:09
  • Does it work inside of a transaction? If I have written data into the database, transaction is committed, and then validation task is executed and returned invalid data? In other words, validation fails after the data have been inserted – Yan Khonski Feb 14 '19 at 10:13
55

Check out this: Change Streams

January 10, 2018 - Release 3.6

*EDIT: I wrote an article about how to do this https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


It's new in mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

In order to use changeStreams the database must be a Replication Set

More about Replication Sets: https://docs.mongodb.com/manual/replication/

Your Database will be a "Standalone" by default.

How to Convert a Standalone to a Replica Set: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


The following example is a practical application for how you might use this.
* Specifically for Node.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Useful links:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

Rio Weber
  • 2,757
  • 1
  • 20
  • 28
48

Since MongoDB 3.6 there will be a new notifications API called Change Streams which you can use for this. See this blog post for an example. Example from it:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
Vince Bowdren
  • 8,326
  • 3
  • 31
  • 56
Mitar
  • 6,756
  • 5
  • 54
  • 86
25

MongoDB version 3.6 now includes change streams which is essentially an API on top of the OpLog allowing for trigger/notification-like use cases.

Here is a link to a Java example: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

A NodeJS example might look something like:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });
Robert Walters
  • 1,367
  • 14
  • 10
4

Alternatively, you could use the standard Mongo FindAndUpdate method, and within the callback, fire an EventEmitter event (in Node) when the callback is run.

Any other parts of the application or architecture listening to this event will be notified of the update, and any relevant data sent there also. This is a really simple way to achieve notifications from Mongo.

Alex
  • 4,844
  • 7
  • 44
  • 58
  • this is very inefficient..you're locking the db for each FindAndUpdate! – Yash Gupta Nov 21 '15 at 16:22
  • 1
    My guess is that Alex was answering a slightly different (not specifically addressing inserts) but related question as in how to fire off some kind of notification to clients when the state of a queued job changes which we assume will need to happen as jobs are spawned, complete successfully or fail. With clients connected using websockets to node, they can all be notified of changes with a broadcast event on the FIndAndUpdate callback which could be called when receive state change messages. I would say that this isn't inefficient as the updates need to be done. – Peter Scott Dec 28 '15 at 21:20
3

Many of these answers will only give you new records and not updates and/or are extremely ineffecient

The only reliable, performant way to do this is to create a tailable cursor on local db: oplog.rs collection to get ALL changes to MongoDB and do with it what you will. (MongoDB even does this internally more or less to support replication!)

Explanation of what the oplog contains: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Example of a Node.js library that provides an API around what is available to be done with the oplog: https://github.com/cayasso/mongo-oplog

John Culviner
  • 22,235
  • 6
  • 55
  • 51
3

There is an awesome set of services available called MongoDB Stitch. Look into stitch functions/triggers. Note this is a cloud-based paid service (AWS). In your case, on an insert, you could call a custom function written in javascript.

enter image description here

Manish Jain
  • 9,569
  • 5
  • 39
  • 44
  • https://stackoverflow.com/users/486867/manish-jain - do you have an example of how stitch can be used to notify a REACT application that data was inserted into a table ? – MLissCetrus Apr 10 '20 at 20:57
1

Actually, instead of watching output, why you dont get notice when something new is inserted by using middle-ware that was provided by mongoose schema

You can catch the event of insert a new document and do something after this insertion done

Duong Nguyen
  • 144
  • 5
0

There is an working java example which can be found here.

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

The key is QUERY OPTIONS given here.

Also you can change find query, if you don't need to load all the data every time.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
Maleen Abewardana
  • 13,600
  • 4
  • 36
  • 39
0

After 3.6 one is allowed to use database the following database triggers types:

  • event-driven triggers - useful to update related documents automatically, notify downstream services, propagate data to support mixed workloads, data integrity & auditing
  • scheduled triggers - useful for scheduled data retrieval, propagation, archival and analytics workloads

Log into your Atlas account and select Triggers interface and add new trigger:

enter image description here

Expand each section for more settings or details.

gotqn
  • 42,737
  • 46
  • 157
  • 243
0

For anyone looking for a C# example of change streams, here is one that also demonstrates the resume token. It also very important to note that this only works with MongoDB Atlas and will NOT work on a Docker container running MongoDB. See the documentation on Change Streams, which states that change streams require:

  1. The database must be in a replica set or sharded cluster.
  2. The database must use the WiredTiger storage engine.
  3. The replica set or sharded cluster must use replica set protocol version 1.

Put this in a console application and add the MongoDB.Driver NuGet package.

// Requires MongoDB.Driver NuGet package.
using ChangeMonitoring;
using MongoDB.Driver;
using MongoDB.Bson;
using Microsoft.Extensions.Configuration;

Console.WriteLine("Monitor has started....");

string connectionString = "--your MongoDB Atlas connection string here---";

if (connectionString.StartsWith("--"))
    throw new ArgumentException("Please update the MongoDB atlas connection string!");

string databaseName = "simple_db"; // TODO: Update with your db name
string collectionName = "people";  // TODO: Update with your collection name

var client = new MongoClient(connectionString);

var tokenSource = new CancellationTokenSource();
string? resumeToken = null; // We spit out a resume token below to the console.
await MonitorOneDatabasesAsync(client, databaseName, resumeToken, tokenSource.Token);

Console.WriteLine("Monitor has exited!!");

static async Task MonitorOneDatabasesAsync(IMongoClient client, string monitorDatabaseName, string? resumeToken,
    CancellationToken cancellationToken = default)
{
    var options = new ChangeStreamOptions
    {
        FullDocument = ChangeStreamFullDocumentOption.Default, 
        ResumeAfter = string.IsNullOrWhiteSpace(resumeToken) ? null : new BsonDocument().Add("_data", resumeToken)
    };
   

    IChangeStreamCursor<ChangeStreamDocument<BsonDocument>> streamCursor = await client
        .GetDatabase(monitorDatabaseName)
        .WatchAsync(options, cancellationToken);
    
    foreach (ChangeStreamDocument<BsonDocument> changeItem in streamCursor.ToEnumerable())
    {
        Console.WriteLine($"Key that changed: {changeItem.DocumentKey}  Operation Type: {changeItem.OperationType}");
        Console.WriteLine($"Resume Token: {changeItem.ResumeToken["_data"]}");

        // Delete doesn't send the full document!
        if (changeItem.FullDocument != null)
        {
            // Show all the fields on the document.
            foreach (string name in changeItem.FullDocument.Names)
            {
                Console.WriteLine($"  {name}: {changeItem.FullDocument[name]}");
            }
        }

        if (cancellationToken.IsCancellationRequested)
            break;
    }
}
David Yates
  • 1,935
  • 2
  • 22
  • 38