I work with some WIFI
devices such as cameras.
The basic fellow that I implemented:
- Someone presses a button.
- The button calls my
Web API
endpoint. - My
Web API
end point calls one of theAPI's
of camera (byHttpRequest
). - Processing each request takes 5 second. And between each request should be 1 second delay. For instance, If you press the button 2 times with one second delay after each: First we expect 5 second for processing the first press, then one second delay and in the end we expect 5 second for the last process (second press).
To do that I am using Queued background tasks
based on Fire and Forgot
manner in .NetCore 3.1
project and it works fine when I am dealing with just one camera.
But the new requirement of the project is, The background task should handle multiple cameras. It means one queue per camera, and queues should work parallel based on the fellow that I described above.
For example if we have 2 devices camera-001
and camera-002
and 2 connected buttons btn-cam-001
and btn-cam-002
, And the order of pressing(0.5sec delay after each press) : 2X btn-cam-001 and 1X btn-cam-002.
What really happens is FIFO
. First the requests of btn-cam-001
will be processed and then btn-cam-002
.
What I expect and need: Camera-002
should not wait to receive the request and the first requests towards both cameras 001
/002
should be processed in a same time(Based on the exmaple). Like each camera has own queue and own process.
The question is how can I achieve that in .NetCore 3.1? Appreciate any help.
My current background service:
public class QueuedHostedService : BackgroundService
{
public IBackgroundTaskQueue TaskQueue { get; }
private readonly ILogger _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory)
{
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is starting.");
while (!cancellationToken.IsCancellationRequested)
{
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try
{
await workItem(cancellationToken);
}
catch (Exception exception)
{
_logger.LogError(exception, $"Error occurred executing {nameof(workItem)}.");
}
}
_logger.LogInformation("Queued Hosted Service is stopping.");
}
}
And the current BackgroundTaskQueue:
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);
private readonly ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
new ConcurrentQueue<Func<CancellationToken, Task>>();
public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
{
if (workItem is null)
{
throw new ArgumentNullException(nameof(workItem));
}
_workItems.Enqueue(workItem);
_signal.Release();
}
public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
{
await _signal.WaitAsync(cancellationToken);
_workItems.TryDequeue(out var workItem);
return workItem;
}
}
My current endpoint:
[HttpPost("hit")]
public ActionResult TurnOnAsync([FromBody] HitRequest request, CancellationToken cancellationToken = default)
{
try
{
var camera = ConfigurationHelper.GetAndValidateCamera(request.Device, _configuration);
_taskQueue.QueueBackgroundWorkItem(async x =>
{
await _cameraRelayService.TurnOnAsync(request.Device, cancellationToken);
Thread.Sleep(TimeSpan.FromSeconds(1));
});
return Ok();
}
catch (Exception exception)
{
_logger.LogError(exception, "Error when truning on the lamp {DeviceName}.", request.Device);
return StatusCode(StatusCodes.Status500InternalServerError, exception.Message);
}
}