Great question
IMHO, a watch with delta changes emitted is pretty useless without knowing what initial state those deltas should apply to, so of course you need to do a find too. AFAIK there is no atomic "find and watch" command for mongo.
Even removing the deltas from the problem and only using the fullDocument=updateLookup
option, there is still the problem of synchronisation. Consider this approach:
- collection.watch()
- changeStream.pause()
- await collection.find().toArray()
- changeStream.resume()
We need to setup the watch first so we catch anything that happens while we are finding. On first glance, this looks good. If something happened while you were doing the find, and there are some change events in the stream, these will get processed when you resume. Since you started the watch before the find, the worse case scenario is that the change stream contains some changes that occurred before the find and can cause saving old state. This normally won't matter since the change stream will also contain the most recent update too and eventually save the correct state.
But did the watch actually run on the mongo cluster before the find?
And did the find query even go to a node in the cluster that is properly synchronised?
Due to the concepts of unifiedTopology and connection pooling, I don't think we can possibly know the answers to these questions. We don't know which connection the above lines of commands will ultimately use. If mongo is offline, the client will buffer commands like watch and find then release them when a connection becomes available to the pool. But since there are multiple connections in play (which may choke at just the wrong time), there is no guarantee that the watch command in line 1 will arrive at mongo before the find command in line 3. So I'm pretty sure this answer will work 99% of the time, but it isn't 100% reliable.
You could attempt to confirm the changeStream is connected by checking the resumeTokenChanged
event, then do the find, but if your cluster is having issues where some nodes oplogs are taking a while to sync, you still might be able to "find()" old data and watch too late. This means missed updates.
Update
I think the best solution we can hope for is to do a combination of everything above plus add a delay based on your desired grace period for cluster synchronisation.
collection.watch()
- Wait until changeStream is fresh by checking the
resumeTokenChanged
event. This also means we are connected.
- Setup a timeout to consider the changeStream stale if we don't receive
resumeTokenChanged
in time. This means our data could be stale too.
- Wait grace period for cluster node synchronisation
changeStream.pause()
- If we still haven't received any
change
events from the changeStream then, await collection.find().toArray()
changeStream.resume()
- If the changeStream emits the
error
event, start the whole process again.