9

My Node app uses Mongo change streams, and the app runs 3+ instances in production (more eventually, so this will become more of an issue as it grows). So, when a change comes in the change stream functionality runs as many times as there are processes.

How to set things up so that the change stream only runs once?

Here's what I've got:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});
PaulIsLoud
  • 654
  • 7
  • 22
  • running Mongo 4.2.19, `mongodb@4.2.2` (driver) and at least two instances of subscribing app - I see only a single consumption per change event – yentsun May 11 '22 at 03:06

6 Answers6

5

Doing this with strong guarantees is difficult but not impossible. I wrote about the details of one solution here: https://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html

The examples are in Java but the important part is the algorithm.

It comes down to a few techniques:

  • Each process attempts to obtain a lock
  • Each lock (or each change) has an associated fencing token
  • Processing each change must be idempotent
  • While processing the change, the token is used to ensure ordered, effectively-once updates.

More details in the blog post.

Alec Henninger
  • 360
  • 3
  • 12
  • @Shubham no, unfortunately it is a Java library. But the algorithm and model could be ported to other languages like JS relatively easily I suspect. – Alec Henninger Sep 26 '20 at 18:32
4

It can easily be done with only Mongodb query operators. You can add a modulo query on the ID field where the divisor is the number of your app instances (N). The remainder is then an element of {0, 1, 2, ..., N-1}. If your app instances are numbered in ascending order from zero to N-1 you can write the filter like this:

const filter = [
  {
    "$match": {
      "$and": [
        // Other filters
        { "_id": { "$mod": [<number of instances>, <this instance's id>]}}
      ]
    }
  }
];
Marc
  • 41
  • 2
1

It sounds like you need a way to partition updates between instances. Have you looked into Apache Kafka? Basically what you would do is have a single application that writes the change data to a partitioned Kafka Topic and have your node application be a Kafka consumer. This would ensure only one application instance ever receives an update.

Depending on your partitioning strategy, you could even ensure that updates for the same record always go to the same node app (if your application needs to maintain its own state). Otherwise, you can spread out the updates in a round robin fashion.

The biggest benefit to using Kafka is that you can add and remove instances without having to adjust configurations. For example, you could start one instance and it would handle all updates. Then, as soon as you start another instance, they each start handling half of the load. You can continue this pattern for as many instances as there are partitions (and you can configure the topic to have 1000s of partitions if you want), that is the power of the Kafka consumer group. Scaling down works in the reverse.

jteezy14
  • 436
  • 4
  • 11
1

While the Kafka option sounded interesting, it was a lot of infrastructure work on a platform I'm not familiar with, so I decided to go with something a little closer to home for me, sending an MQTT message to a little stand alone app, and letting the MQTT server monitor messages for uniqueness.

siteStream.on("change", async change => {
  console.log("in site change stream);
  const mqttClient = mqtt.connect("mqtt://localhost:1883");
  const id = JSON.stringify(change._id._data);
  // You'll want to push more than just the change stream id obviously...
  mqttClient.on("connect", function() {
    mqttClient.publish("myTopic", id);
    mqttClient.end();
  });
});

I'm still working out the final version of the MQTT server, but the method to evaluate uniqueness of messages will probably store an array of change stream IDs in application memory, as there is no need to persist them, and evaluate whether to proceed any further based on whether that change stream ID has been seen before.

var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
  client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
  context = message.toString().replace(/"/g, "");
  if (seen.indexOf(context) < 0) {
    seen.push(context);
    // Do stuff
  }
});

This doesn't include security, etc., but you get the idea.

PaulIsLoud
  • 654
  • 7
  • 22
  • Was this actually the best way to handle this in the end? I am currently facing the same issue where multiple instances are messing up my DB writes I'm trying to achieve in the watch function. This seems like adding a lot more complexity to what I originally thought was a simple implementation of change streams. :( – Ryann Galea Nov 11 '18 at 10:20
  • 1
    At this point, my mind goes towards running another node server with a connection to the database which handles all the watching, which is obviously limited to a single instance? – Ryann Galea Nov 11 '18 at 10:37
  • @RyannGalea: Hi guys, I'm running into the same situation. . I'm writing a service which will responsible for watching changes and publish the changes to rabbitmq topic. But I still have some concerns such as single instance of this service may be overload by mongodb writes, resume change...etc. Anw, how is your final solution ? – pham cuong Feb 12 '19 at 08:10
  • There are some ways to distribute the load to multiple producers such as base on the created time of entity. Prefer to pluralsight's [course](https://app.pluralsight.com/player?course=mongodb-change-streams-driving-real-time-events-streaming-applications&author=nuri-halperin&name=65638e1a-95a3-4a4d-bd08-21b1999d509b&clip=0&mode=live). – pham cuong Feb 12 '19 at 08:24
  • So far it's been fine, but I think of it more as a stopgap solution for the time being. I believe you would need a pretty massive amount of non-stop amount of messages being sent to RabbitMQ before you would run in to scale issues. Will be looking in to the course from pluralsight for a longer term solution, thanks for sharing @pcuong – PaulIsLoud Feb 12 '19 at 12:39
1

Will that having a field in DB called status which will be updated using findAnUpdate based on the event received from change stream. So lets say you get 2 events at the same time from change stream. First event will update the status to start and the other will throw error if status is start. So the second event will not process any business logic.

iAviator
  • 1,310
  • 13
  • 31
  • what if they do the read on thse same time of the status "start" and then both of them change it ... it is a bad work around solution – Ameur Baccoucha Aug 10 '20 at 10:09
  • So only one will be able to change it as the status field will be updated to ```end`` and while the other tries to change it will be a failure. This is something u can gurantee in MongoDB that is update is atomic. – iAviator Aug 10 '20 at 12:54
  • This seems like the best approach. All receivers of the change stream try to flip the flag, and the only one that succeeds processes the event. The only concern is if the change event's `_id` field is stable for each event. – Tunji_D Feb 05 '22 at 18:43
  • This is what I do: Each instance gets its own ID (uuidv4, something random). Instead of sending "start" what you do is make status an object {id:, timestamp:new Date()}. So first time you get the value and see if the status is undefined/null/etc, if not, you write your own uuid and timestamp to the property. Afterwards, you read that value and if the ids match, you continue. The timestamp is for checking abandoned, e.g. if the timestamp is > 10 minutes old assume its false. – JonShipman Aug 18 '22 at 19:47
0

I'm not claiming those are rock-solid production grade solutions, but I believe something like this could work

Solution 1

applying Read-Modify-Write:

  1. Add version field to the document, all the created docs have version=0
  2. Receive ChangeStream event
  3. Read the document that needs to be updated
  4. Perform the update on the model
  5. Increment version
  6. Update the document where both id and version match, otherwise discard the change

Yes, it creates 2 * n_application_replicas useless queries, so there is another option

Solution 2

  1. Create collection of ResumeTokens in mongo which would store collection -> token mapping
  2. In the changeStream handler code, after successful write, update ResumeToken in the collection
  3. Create a feature toggle that will disable reading ChangeStream in your application
  4. Configure only a single instance of your application to be a "reader"

In case of "reader" failure you might either enable reading on another node, or redeploy the "reader" node.

As a result: there might be an infinite amount of non-reader replicas and there won't be any useless queries

gasabr
  • 55
  • 3
  • 9