5

I have a service that needs to watch a collection on a Mongo DB to create changes in the system. I have managed to establish a connection to a replica set using the C# driver and I'm using the following code to test the change stream.

public async Task WatchLoopAsync()
{
    var options = new ChangeStreamOptions
    {
        FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
    };
            
    using (var cursor = await _collection.WatchAsync(options))
    {
        _logger.LogInformation("Watching collection {String}", 
            _deployments.CollectionNamespace);
                
        await cursor.ForEachAsync(changeStreamDocument =>
        {
            var document = changeStreamDocument.FullDocument;
            _logger.LogInformation("Received document: {String}", 
                document.ToString());
        });
    }
}

The first log appears stating that it is watching the collection with the correct namespace. I then add a document to the collection expecting to see something log as "Received document: ..." but nothing logs.

I followed the async pattern given in the documentation here.

jamesnet214
  • 1,044
  • 13
  • 21
tvandinther
  • 371
  • 3
  • 13

2 Answers2

2

give the following a try:

using (var cursor = await _collection.WatchAsync(options))
{
    _logger.LogInformation("Watching collection {String}", _collection.CollectionNamespace);

    while (await cursor.MoveNextAsync())
    {
        foreach (var csd in cursor.Current)
        {
            switch (csd.OperationType)
            {
                case ChangeStreamOperationType.Insert:
                case ChangeStreamOperationType.Update:
                case ChangeStreamOperationType.Replace:
                    var document = csd.FullDocument;
                    _logger.LogInformation("Modified document: {String}", document.ToString());
                    break;
                case ChangeStreamOperationType.Delete:
                    var id = csd.DocumentKey["_id"].ToString();
                    _logger.LogInformation("Deleted document: {String}", id);
                    break;
                case ChangeStreamOperationType.Invalidate:
                    _logger.LogInformation("collection dropeed or renamed")
                    break;
            }
        }
    }

if you'd like a simpler implementation for changestreams, have a look at this

Dĵ ΝιΓΞΗΛψΚ
  • 5,068
  • 3
  • 13
  • 26
  • Thanks for your response. I've tried your suggestion as well as the Entities library without any luck. I'm wondering if there is perhaps an issue with the database connections. I'm using environment variables to form my connections so the write and change stream processes should be using the same connection strings. – tvandinther Jul 11 '21 at 22:17
0

The solution to my particular problem was that the change stream receiving code was throwing an exception when it couldn't match a property "name" to an object property of "Name" due to the property casing conventions in C#.

To fix this, I used the following code snippet:

var conventionPack = new ConventionPack
{
    new CamelCaseElementNameConvention()
};
ConventionRegistry.Register("camelCase", conventionPack, t => true);

This registers the DB driver to use the camel casing conventions when mapping entities to BSON.

tvandinther
  • 371
  • 3
  • 13