2

Cant prevent multiple consumer from processing the same record in queue

using System.Threading.Channels library, when writer enqueues some model, one of multiple consumers starts to process it. during this, writer goes to the database and reads the same record(because it is still being processed and status is not updated) which results in another consumer starting to process the same model. I have implemented ConcurrentDictionary in order to prevent the problem, but it can not help. Any ideas on how to solve this problem?

Here is the code:

public sealed class SendImagesBackgroundService : BackgroundTask
{
    private readonly ILogger<SendImagesBackgroundService> _logger;
    private readonly IServiceProvider _serviceProvider;
    private readonly IApiService _apiService;
    private readonly Channel<InspectionFileModel> _channel;
    private readonly SendImagesBackgroundServiceOptions _options;
    private static readonly ConcurrentDictionary<string, bool>
        s_concurrentDictionary = new();

    public SendImagesBackgroundService(
        ILogger<SendImagesBackgroundService> logger,
        IServiceProvider serviceProvider,
        IApiService apiService,
        IOptions<SendImagesBackgroundServiceOptions> options,
        IBackgroundTaskLockProvider lockProvider) : base(logger)
    {
        UseExclusiveLock(lockProvider);

        _options = options.Value;
        _logger = logger;
        _serviceProvider = serviceProvider;
        _apiService = apiService;

        _channel = Channel.CreateBounded<InspectionFileModel>(
            new BoundedChannelOptions(_options.ChannelQueueSize));
    }

    protected override Task ExecuteAsync(CancellationToken cancellationToken)
    {
        Task.Run(() => FetchImages(cancellationToken), cancellationToken);
        Task.Run(() => StartProcessingInspectionImages(cancellationToken),
            cancellationToken);
        return Task.CompletedTask;
    }

    private async Task FetchImages(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            using var serviceScope = _serviceProvider.CreateScope();
            List<InspectionFileModel>? inspectionImages = default;
            do
            {
                try
                {
                     var mediatr = serviceScope.ServiceProvider
                         .GetRequiredService<IMediator>();
                     inspectionImages = await mediatr.Send(
                         new DownloadInspectionFilesQueryNew(),cancellationToken);
                     if (inspectionImages is not {Count: > 0})
                     {
                         continue;
                     }

                     var nonProcessingImages = inspectionImages.Where(
                         x => !s_concurrentDictionary.ContainsKey(x.Id));

                     foreach (var image in nonProcessingImages)
                     {
                         s_concurrentDictionary.TryAdd(image.Id, true);
                         while (!(cancellationToken.IsCancellationRequested &&
                             await _channel.Writer.WaitToWriteAsync(
                                 cancellationToken)))
                         {
                             if (!_channel.Writer.TryWrite(image))
                             {
                                 continue;
                             }

                             break;
                         }
                     }
                }
                catch (Exception ex) when (!(cancellationToken
                    .IsCancellationRequested && ex is OperationCanceledException))
                {
                    _logger.LogError(ex, "Fetching inspection images Failed");
                }

            } while (!cancellationToken.IsCancellationRequested
                && inspectionImages  is {Count: > 0});

            await Task.Delay( _options.DelayBetweenFetchBatchMs, cancellationToken);
        }

        _logger.LogInformation("End fetching inspection images");
    }

    private async Task StartProcessingInspectionImages(
        CancellationToken cancellationToken)
    {
        var parallelProcesses = new List<Task>();
        for (int i = 0; i <   _options.NumberOfParallelTasks; i++)
        {
            var task = Task.Run(() => ProcessInspectionImages(cancellationToken),
                cancellationToken);
            parallelProcesses.Add(task);
        }
        await Task.WhenAll(parallelProcesses);
    }

    private async Task ProcessInspectionImages(CancellationToken cancellationToken)
    {
        while (!(cancellationToken.IsCancellationRequested &&
                 await _channel.Reader.WaitToReadAsync(cancellationToken)))
        {
            while (!cancellationToken.IsCancellationRequested &&
                     _channel.Reader.TryRead(out var inspectionImage ))
            {
                try
                {
                    await SendInspectionImageToLivo(inspectionImage,
                        cancellationToken);
                }
                catch (Exception ex) when (!(cancellationToken
                    .IsCancellationRequested && ex is OperationCanceledException))
                {
                   //handle
                }
            }
        }
    }

    private async Task SendInspectionImageToLivo(InspectionFileModel image,
        CancellationToken cancellationToken)
    {
        try
        {
            //send data over the network
        }
        catch (ApiException ex)
        {
            //handle 
        }
        finally
        {
            s_concurrentDictionary.TryRemove(image.Id, out bool _);
        }
    }

    public override object? GetTelemetry() => null;
}

The status is updated in the SendInspectionImageToLivo method. If status code is 200, status field becomes successful. In case of 4** it's set to failed, and subsequent DB queries won't include them in the result, based on those statuses.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
tabsandze
  • 23
  • 3
  • *`public sealed class SendImagesBackgroundService : BackgroundTask`* -- What is the `BackgroundTask`? Is it built-in or custom? – Theodor Zoulias Feb 22 '23 at 09:23
  • @TheodorZoulias custom, implements IHostedService – tabsandze Feb 22 '23 at 09:29
  • 2
    Is it an option to solve this problem at the database level, by adding a `Processing` boolean field in the database? A consumer would start by updating this field to `true` (only if it is currently `false`), and in case the `recordsAffected = 1` then proceed with processing the record, otherwise skip it. `UPDATE Records SET Processing = 1 WHERE Id = @Id AND Processing = 0;` – Theodor Zoulias Feb 22 '23 at 09:54
  • @TheodorZoulias yes thats a valid option, but id like to manage it on code level – tabsandze Feb 22 '23 at 12:31
  • The issue with relying on code is that even if you write the code correctly, the moment that you accidentally start two instances of your application all bets are off. Doing it at the database level is safer. – Theodor Zoulias Feb 22 '23 at 12:59
  • UseExclusiveLock(lockProvider) method takes care of that, the process will only be run on one instance – tabsandze Feb 22 '23 at 13:03
  • *"because it is still being processed and status is not updated"* -- At which point in the pipeline is the status updated? Is it happening inside the `SendInspectionImageToLivo` method? – Theodor Zoulias Feb 22 '23 at 13:09
  • 1
    Yes, if status code is 200 status field becomes successful, in case of 4** its set to failed and subsequent db queries wont include them in the result, based on those statuses – tabsandze Feb 22 '23 at 13:15
  • What .NET platform are you targeting? .NET 7? – Theodor Zoulias Feb 22 '23 at 13:23
  • no im targeting net 6 – tabsandze Feb 22 '23 at 13:31
  • 1
    To be clear, there's nothing wrong with Channels here. The problem is that the producer is putting the same item in the channel more than once. So, you'll either need to mark the database record as processing (as suggested above) (note: there are complications with orphaned records), or change the channel to a [durable queue](https://blog.stephencleary.com/2021/01/asynchronous-messaging-2-durable-queues.html) and use the Outbox Pattern (i.e., as soon as the message is added to the queue, the db record is deleted or marked as handled). – Stephen Cleary Feb 22 '23 at 13:53
  • 1
    @tabsandze you're confusing different "concurrent" accesses. A Channel doesn't allow multiple consumers to get the same message. No ifs or buts. Unless your own application puts the same item in there multiple times. That's why people talk about *messages*, not objects or items. If you want to modify a record, the message should be an `Update Record 123 with X` operation, not the object itself – Panagiotis Kanavos Feb 22 '23 at 13:54
  • As for `writer goes to the database and reads the same record`, that means a better way to determine what to read is needed. Without knowing what data is loaded, based on what criteria, one can only guess. One option is to use the `UPDATE ... OUTPUT` clause in SQL Server or `RETURNING` in MariaDB/PostgreSQL to both flag a row *and* return its ID. Another option is to use [Change Tracking](https://learn.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-tracking-sql-server?view=sql-server-ver16) in SQL Server to retrieve new/changed rows – Panagiotis Kanavos Feb 22 '23 at 14:27

1 Answers1

1

I think that the key for solving your problem is to take a snapshot of the currently processed images, before fetching the inspectionImages via the DownloadInspectionFilesQueryNew command. Then filter the images based on the snapshot, and not based on the current state of the dictionary. Otherwise it is possible to start fetching the images with a X image stored in the dictionary, while the images are fetched the X image is processed and is removed from the dictionary, the X image is among the fetched images, and eventually the X image is not filtered and it is processed again. Using the snapshot eliminates this possibility.

Below is an alternative way to implement your service, that you might find interesting. It is based on the Parallel.ForEachAsync overload that takes an IAsyncEnumerable<T> as source. The source is an iterator method that fetches unprocessed images and yields them. I think that it is simpler and more readable than the Channel<T>-based implementation.

protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
    ParallelOptions parallelOptions = new()
    {
        MaxDegreeOfParallelism = _options.NumberOfParallelTasks,
        CancellationToken = cancellationToken,
    };

    HashSet<string> processing = new();

    async IAsyncEnumerable<InspectionFileModel> Producer(
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        while (true)
        {
            HashSet<string> processingSnapshot;
            lock (processing) processingSnapshot = new(processing);
            var inspectionImages = await mediatr.Send(
                new DownloadInspectionFilesQueryNew(), ct);
            var nonProcessingImages = inspectionImages
                .Where(x => !processingSnapshot.Contains(x.Id));
            int yieldedCount = 0;
            foreach (InspectionFileModel image in nonProcessingImages)
            {
                lock (processing) processing.Add(image.Id);
                yield return image; yieldedCount++;
            }
            if (yieldedCount == 0)
                await Task.Delay(1000, ct); // Take a small break.
        }
    }

    return Parallel.ForEachAsync(Producer(), parallelOptions, async (image, ct) =>
    {
        try
        {
            await SendInspectionImageToLivo(image, ct);
        }
        finally { lock (processing) processing.Remove(image.Id); }
    });
}

You might find that it has better behavior in case of unhandled exceptions too.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104