3

I'm trying to listen to a MongoDB collection, for all docs where the field category is an empty list, do some stuff to fill the category field, and then listen to the following incoming docs.

Using these (which are old) :

How to listen for changes to a MongoDB collection?

with Python is there a way to listen for changes when insert or update is made in mongodb

I came up with the following :

client = pymongo.MongoClient(mongodb_uri_connexion_string)
db = client.get_default_database()
my_collection = db['my_collection']

cursor = my_collection.find({'category':[]},cursor_type=pymongo.cursor.CursorType.EXHAUST)  

while True:
    try:
        doc = cursor.next()     
        # place where computation gets done to find the value of new_cat
        my_collection.find_one_and_update({'_id':doc['_id']}, {'$push':{'category':{new_cat:'1'}}})
        pprint.pprint(my_collection.find_one({'_id':doc['_id']})

    except StopIteration:
        print("end of cursor, wait")
        time.sleep(0.2)
    except Exception as e:
        print("another problem came up, see below")
        print(exception_to_string(e))

When I run the script, it's doing what it should be doing, printing the docs with the category updated, up until the point it is done with the content of cursor at the moment the line was run : it's not listening to subsequent new docs in my_collection, I get an infinite number of end of cursor, wait.

If I change the CursorType to TAILABLE_AWAIT, it seems to not even do the computations in the try block, and jump straight to the infinite printing of end of cursor, wait

What am I doing wrong ? Thanks

Community
  • 1
  • 1
François M.
  • 4,027
  • 11
  • 30
  • 81

1 Answers1

1

As of now, the best way to perform a listening option on mongodb is to use change streams, which in turn require use of replica sets. Replica sets are a redundancy backup feature which allows a backup data base to take over the existing database in case it fails, seamlessly.

Anyways, the Mongodb documentation is a bit convoluted for the purposes of this question so here is the simple step by step:

  1. On an installed mongodb, open the configuration file /etc/mongod.conf and add the following lines
replication:
    replSetName: "rs0"
  1. Now open mongo shell and type

    > rs.initiate()

This should give the output below

   {
        "info2" : "no configuration specified. Using a default configuration for the set",
        "me" : "127.0.0.1:27017",
        "ok" : 1,
        "$clusterTime" : {
            "clusterTime" : Timestamp(1613644807, 2),
            "signature" : {
                "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
                "keyId" : NumberLong(0)
            }
        },
        "operationTime" : Timestamp(1613644807, 1)
    }

it should now show your shell as rs0:PRIMARY>

Replica sets are now installed and you can use the change streams to listen for mongodb insertions. It's quite straight forward in Pymongo. Taking your example, the following code will listen for insertions on the collection "coll", and if there is no "category" field, it will add the field "category" as "None"

Python code:

    import pymongo
    mongoclient = pymongo.MongoClient("mongodb://127.0.0.1:27017")
    db = mongoclient['My_DB']
    coll = db['coll']
    with coll.watch([{'$match': {'operationType': 'insert'}}]) as change_stream:
        for insert_change in change_stream:
            print(insert_change)
            inserted_entry = insert_change['fullDocument'] 
            if 'category' not in inserted_entry.keys():
                coll.update_one(inserted_entry,{"$set":{"category":"None"}})

Note: you can directly use the "inserted_entry" as the query dictionary because it already contains all the fields in that record, including the objectId.

Mongo Db:

rs0:PRIMARY> use My_DB
switched to db My_DB
rs0:PRIMARY> db.coll.insert({"Name":"Jack"})
WriteResult({ "nInserted" : 1 })
rs0:PRIMARY> db.coll.find()
{ "_id" : ObjectId("602e61bc0dc4aa57751b5e02"), "Name" : "Jack", "category" : "None" }
JackX
  • 91
  • 6
  • I only get this output { info2: 'no configuration specified. Using a default configuration for the set', me: '127.0.0.1:27017', ok: 1 } but when i watch() for a collection it's working – Sins97 Mar 28 '23 at 16:09