Cant prevent multiple consumer from processing the same record in queue
using System.Threading.Channels
library, when writer enqueues some model, one of multiple consumers starts to process it. during this, writer goes to the database and reads the same record(because it is still being processed and status is not updated) which results in another consumer starting to process the same model. I have implemented ConcurrentDictionary
in order to prevent the problem, but it can not help. Any ideas on how to solve this problem?
Here is the code:
public sealed class SendImagesBackgroundService : BackgroundTask
{
private readonly ILogger<SendImagesBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IApiService _apiService;
private readonly Channel<InspectionFileModel> _channel;
private readonly SendImagesBackgroundServiceOptions _options;
private static readonly ConcurrentDictionary<string, bool>
s_concurrentDictionary = new();
public SendImagesBackgroundService(
ILogger<SendImagesBackgroundService> logger,
IServiceProvider serviceProvider,
IApiService apiService,
IOptions<SendImagesBackgroundServiceOptions> options,
IBackgroundTaskLockProvider lockProvider) : base(logger)
{
UseExclusiveLock(lockProvider);
_options = options.Value;
_logger = logger;
_serviceProvider = serviceProvider;
_apiService = apiService;
_channel = Channel.CreateBounded<InspectionFileModel>(
new BoundedChannelOptions(_options.ChannelQueueSize));
}
protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
Task.Run(() => FetchImages(cancellationToken), cancellationToken);
Task.Run(() => StartProcessingInspectionImages(cancellationToken),
cancellationToken);
return Task.CompletedTask;
}
private async Task FetchImages(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
using var serviceScope = _serviceProvider.CreateScope();
List<InspectionFileModel>? inspectionImages = default;
do
{
try
{
var mediatr = serviceScope.ServiceProvider
.GetRequiredService<IMediator>();
inspectionImages = await mediatr.Send(
new DownloadInspectionFilesQueryNew(),cancellationToken);
if (inspectionImages is not {Count: > 0})
{
continue;
}
var nonProcessingImages = inspectionImages.Where(
x => !s_concurrentDictionary.ContainsKey(x.Id));
foreach (var image in nonProcessingImages)
{
s_concurrentDictionary.TryAdd(image.Id, true);
while (!(cancellationToken.IsCancellationRequested &&
await _channel.Writer.WaitToWriteAsync(
cancellationToken)))
{
if (!_channel.Writer.TryWrite(image))
{
continue;
}
break;
}
}
}
catch (Exception ex) when (!(cancellationToken
.IsCancellationRequested && ex is OperationCanceledException))
{
_logger.LogError(ex, "Fetching inspection images Failed");
}
} while (!cancellationToken.IsCancellationRequested
&& inspectionImages is {Count: > 0});
await Task.Delay( _options.DelayBetweenFetchBatchMs, cancellationToken);
}
_logger.LogInformation("End fetching inspection images");
}
private async Task StartProcessingInspectionImages(
CancellationToken cancellationToken)
{
var parallelProcesses = new List<Task>();
for (int i = 0; i < _options.NumberOfParallelTasks; i++)
{
var task = Task.Run(() => ProcessInspectionImages(cancellationToken),
cancellationToken);
parallelProcesses.Add(task);
}
await Task.WhenAll(parallelProcesses);
}
private async Task ProcessInspectionImages(CancellationToken cancellationToken)
{
while (!(cancellationToken.IsCancellationRequested &&
await _channel.Reader.WaitToReadAsync(cancellationToken)))
{
while (!cancellationToken.IsCancellationRequested &&
_channel.Reader.TryRead(out var inspectionImage ))
{
try
{
await SendInspectionImageToLivo(inspectionImage,
cancellationToken);
}
catch (Exception ex) when (!(cancellationToken
.IsCancellationRequested && ex is OperationCanceledException))
{
//handle
}
}
}
}
private async Task SendInspectionImageToLivo(InspectionFileModel image,
CancellationToken cancellationToken)
{
try
{
//send data over the network
}
catch (ApiException ex)
{
//handle
}
finally
{
s_concurrentDictionary.TryRemove(image.Id, out bool _);
}
}
public override object? GetTelemetry() => null;
}
The status is updated in the SendInspectionImageToLivo
method. If status code is 200, status field becomes successful. In case of 4** it's set to failed, and subsequent DB queries won't include them in the result, based on those statuses.