2

I work with some WIFI devices such as cameras.

The basic fellow that I implemented:

  1. Someone presses a button.
  2. The button calls my Web API endpoint.
  3. My Web API end point calls one of the API's of camera (by HttpRequest).
  4. Processing each request takes 5 second. And between each request should be 1 second delay. For instance, If you press the button 2 times with one second delay after each: First we expect 5 second for processing the first press, then one second delay and in the end we expect 5 second for the last process (second press).

To do that I am using Queued background tasks based on Fire and Forgot manner in .NetCore 3.1 project and it works fine when I am dealing with just one camera.

But the new requirement of the project is, The background task should handle multiple cameras. It means one queue per camera, and queues should work parallel based on the fellow that I described above.

For example if we have 2 devices camera-001 and camera-002 and 2 connected buttons btn-cam-001 and btn-cam-002, And the order of pressing(0.5sec delay after each press) : 2X btn-cam-001 and 1X btn-cam-002.

What really happens is FIFO. First the requests of btn-cam-001 will be processed and then btn-cam-002.

What I expect and need: Camera-002 should not wait to receive the request and the first requests towards both cameras 001/002 should be processed in a same time(Based on the exmaple). Like each camera has own queue and own process.

The question is how can I achieve that in .NetCore 3.1? Appreciate any help.

My current background service:

public class QueuedHostedService : BackgroundService
{
    public IBackgroundTaskQueue TaskQueue { get; }

    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(cancellationToken);

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, $"Error occurred executing {nameof(workItem)}.");
            }
        }

        _logger.LogInformation("Queued Hosted Service is stopping.");
    }
}

And the current BackgroundTaskQueue:

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    private readonly ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
        new ConcurrentQueue<Func<CancellationToken, Task>>();

    public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
    {
        if (workItem is null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);

        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);

        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}

My current endpoint:

  [HttpPost("hit")]
    public ActionResult TurnOnAsync([FromBody] HitRequest request, CancellationToken cancellationToken = default)
    {
        try
        {
            var camera = ConfigurationHelper.GetAndValidateCamera(request.Device, _configuration);

            _taskQueue.QueueBackgroundWorkItem(async x =>
                {
                    await _cameraRelayService.TurnOnAsync(request.Device, cancellationToken);

                    Thread.Sleep(TimeSpan.FromSeconds(1));

                });

            return Ok();
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, "Error when truning on the lamp {DeviceName}.", request.Device);

            return StatusCode(StatusCodes.Status500InternalServerError, exception.Message);
        }
    }
peyman gilmour
  • 1,168
  • 2
  • 16
  • 35
  • 1
    Can you show the code where you currently enforcing the 1 sec delay? Are you enforcing it before calling the `TaskQueue.QueueBackgroundWorkItem` method? – Theodor Zoulias May 21 '20 at 18:33
  • 1
    As a side note, the `ExecuteAsync` method seems to behave inconsistently in case of cancellation. The returned `Task` can either transition to a `Canceled` stated (if the cancellation happens during any of the `await` stages), or to a `RanToCompletion` state (if the cancellation happens during the `IsCancellationRequested` check in the `while` loop). – Theodor Zoulias May 21 '20 at 18:34
  • @TheodorZoulias Thanks for replying. I updated the post. – peyman gilmour May 21 '20 at 18:39
  • 1
    Instead of `Thread.Sleep(TimeSpan.FromSeconds(1));` you could consider the more lightweight `await Task.Delay(TimeSpan.FromSeconds(1));`, to avoid blocking a `ThreadPool` thread. – Theodor Zoulias May 21 '20 at 18:50
  • 1
    @TheodorZoulias Thanks for the great tips. I will apply all... And Task.Delay() is what I wanted to do. – peyman gilmour May 21 '20 at 19:04

1 Answers1

1

Instead of a single BackgroundTaskQueue you could have one per camera. You could store the queues in a dictionary, having the camera as the key:

public IDictionary<IDevice, IBackgroundTaskQueue> TaskQueues { get; }

Then in your end-point use the queue that is associated with the requested camera:

_taskQueues[camera].QueueBackgroundWorkItem(async x =>
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks for replying. I tried almost 2 ways and apply the changes that you mentioned which were unsuccessful. The first way was to apply all these changes in controller. But since IBackgroundTaskQueue is singleton I got a same result. – peyman gilmour May 22 '20 at 09:16
  • The otherway made me so confuse. IDictionary in the background service .cs instead of IBackgroundTaskQueue TaskQueue { get; }. The issue is how the service knows about camera as a key? because I need to retrieve the right backgroundtaskqueue from dic in order to dequeue and execute the task. Could you please provide me an example? ( I used this link more or less https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.1&tabs=visual-studio#queued-background-tasks) – peyman gilmour May 22 '20 at 09:16
  • 1
    @peymangilmour I guess that you know how many cameras you have when the program starts. You can populate the dictionary with the cameras when the service starts: `TaskQueues.Add(camera1, new BackgroundTaskQueue())` etc. Btw the `BackgroundTaskQueue` class seems like a quite primitive implementation of an async queue. If I were in your place I would prefer to use the [`Channels`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels). There are [other options](https://stackoverflow.com/questions/21225361/is-there-anything-like-asynchronous-blockingcollectiont) as well. – Theodor Zoulias May 22 '20 at 14:11
  • Yes, agree ... I will try to use Channels instead. Thanks again. – peyman gilmour May 22 '20 at 16:22