1

I have a service that spawns off a number of Changefeeds to monitor a number of different Cosmos DB collections. In v1 or 2,the ChangefeedObserver class included the ChangefeedObserverContext from which I could extract the collection name from.

  public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> deltas, CancellationToken cancellationToken)
    {
        string observerCollection = string.Empty;
        try
        {
            Regex rx = new Regex(@"\b(\w+$)");
            observerCollection = rx.Match(context.FeedResponse.ContentLocation).Value.ToString();

In v3, instead of passing a type as you processor, you pass it a delegate method whose signature no longer contains the context

MS Docs Container.ChangesHandlerDelegate

Change Feed Processor

    private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
    {
        ChangeFeedProcessor changeFeedProcessor = monitoringContainer
            .GetChangeFeedProcessorBuilder<Document>(hostName, this.HandleChangesAsync)
                .WithInstanceName("isn")
                .WithLeaseContainer(leasingContainer)
                .Build();

        return changeFeedProcessor;
    }

    private async Task HandleChangesAsync(IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
    {
        ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);

        try
        {
            AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
            await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
        }
        catch (Exception ex)
        {
            logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
        }
    }

In the above coded, I have a basic method that creates the Changefeed (gets started via a timer), and the the delegate method that sends the processing off to a larger class to take actions, depending on the monitored collection.

So, how can I get this changefeeds Monitored Collection value into the ChangesHander?

Tdawg90
  • 103
  • 1
  • 10

1 Answers1

1

You already have the reference, you can inject it or reference it.

private ChangeFeedProcessor ChangeFeedInitialize(Container leasingContainer, Container monitoringContainer, string hostName)
{
    ChangeFeedProcessor changeFeedProcessor = monitoringContainer
        .GetChangeFeedProcessorBuilder<Document>(hostName, 
                (IReadOnlyCollection<Document> changes, CancellationToken cancellationToken) => 
                    this.HandleChangesAsync(monitoringContainer, changes, cancellationToken))
            .WithInstanceName("isn")
            .WithLeaseContainer(leasingContainer)
            .Build();

    return changeFeedProcessor;
}

private async Task HandleChangesAsync(Container monitoringContainer, IReadOnlyCollection<Document> changes, CancellationToken cancellationToken)
{
    ILogger logger = AnalyticsHelper.BuildMeMyLogger(this.loggerFactory);

    try
    {
        AnalyticsChangefeedProcessor changefeedProcessor = new AnalyticsChangefeedProcessor();
        await changefeedProcessor.HandleChangesAsync(changes, this.analyticsContext.DataLakeStorageProvider, "CollectionName", logger);
    }
    catch (Exception ex)
    {
        logger.LogFailure($"Failed to process changes: {ex.Message}", TagIds.ExceptionAnalytics, ex);
    }
}
Matias Quaranta
  • 13,907
  • 1
  • 22
  • 47