1

Recently, i had a requirement to queue async tasks and i was introduced to BlockingCollection in this link Queuing asynchronous task in C# It worked and i'm having a slight change in requirement and need your guidance. I'm using the BlockingCollection as in @Stephen Cleary answer

This is the BlockingCollection from that link

public sealed class ExecutionQueue
{
  //private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();//commented this
  private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>();

  public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

//public Task Run(Func<Task> lambda)
public Task Run(<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(lamda);
  return tcs.Task;
}
  1. I need to queue certain DataBase tasks which is within a regular void method. I may not be able to change the signature of this method. How do i do them?
 public static ExecutionQueue taskQueue = new ExecutionQueue();

 private void SaveValesToDB(...)
 {
    var item = GetID(...);
    ...
    taskQueue.Run(Task.Run(() =>
    {
       DBInstance.DBSaveValue1(...); // is it correct to wrap with Task.Run and add to queue? it should be queued and run asynchronously
     });
    ...
 }
  1. We save and retrieve data from DB on and off. So, when we queue a DB call that is returning something like a getter, we want to ensure that until we receive the return value we don't process other items that are queued.
private void SaveValesToDB(...)
{
 ...
 taskQueue.Run(Task.Run(() =>
 {
    DBInstance.DBSaveValue1(...); // is this correct? it should be queued and run asynchronously
  });
 ...
 taskQueue.Run(Task.Run(() =>
 {
    var result1 = DBInstance.DBGetValue2(...); // should be queued and run asynchronously; 
    LogData(result1);// not a DB call but believe it should be wrapped in here for the result1, correct?
 });

 /*so in above Task.Run,  i want to ensure that until i receive result1 
 i don't process other items in the queue even 
 if they are added. how can i do that ? 
 The main thread should continue. */
 ...
 var result 2 = DBInstance.DBGetValue3(...); // should be queued and run asynchronously

 UpdateAdvancedLod(result1 +" "+result2);// here, should i block main thread until i get result1 ?
}

  1. How to handle errors?

Please, guide me.

Edited:

if using Func<Task> in public Task Run(Func<Task> lambda) then is the below correct?

taskQueue.Run(async () =>
                {
                    await Task.Run(() =>
                    {
                        DBInstance.DBSaveValue1(...);//is this correct
                    });
                }
                );
jamilia
  • 359
  • 4
  • 14
  • Why did you changed the signature of the `Run` method from `public Task Run(Func lambda)` to `public Task Run(Task task)`? This change totally breaks the intended functionality of the `ExecutionQueue` class. The whole point of this class is to schedule tasks to run at the right moment. When you give it an already created task, the task is already running, and the `ExecutionQueue` class loses the reason for its existence. – Theodor Zoulias Apr 09 '20 at 10:39
  • @TheodorZoulias: Ok, I see your point. In my use case how can i add them to queue if don't want to change the method signature for SaveValesToDB() ? – jamilia Apr 09 '20 at 16:09

1 Answers1

0

You could add this method to Stephen Cleary's ExecutionQueue class:

public Task Run(Action action)
{
    return Run(() => Task.Run(action));
}

This is an overload of the existing public Task Run(Func<Task> lambda) method. This one delegates the execution of the supplied action to a ThreadPool thread.

Usage example:

var id = GetID();
var task = taskQueue.Run(() => DBInstance.DBSaveValue1(id));
await task; // Optional

Update: To propagate error notifications to the main thread, you could enhance the ExecutionQueue class with an Error event, which would be invoked in the captured context (captured at the time that the instance was created).

private readonly SynchronizationContext _capturedContext;

public event EventHandler<Exception> Error;

public ExecutionQueue() // Constructor
{
    _capturedContext = SynchronizationContext.Current ?? new SynchronizationContext();
    Completion = Task.Run(() => ProcessQueueAsync());
}

private void OnError(Exception ex)
{
    var handler = Error; if (handler == null) return;
    _capturedContext.Post(_ => handler.Invoke(this, ex), null);
}

The OnError should be called from inside the catch (Exception ex) block. This will work with Windows Forms apps and WPF apps, because their UI thread is equipped with a SynchronizationContext. It will not work with a Console app because there is no SynchronizationContext there (the Error event will be raised in a random ThreadPool thread).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks. How about my Q2 where if i encounter a DB Getter method, i need to wait for it to return results and only then process other items in the queue? is it possible? I still should be able to add items to the queue so my main thread isn't blocked. – jamilia Apr 09 '20 at 17:28
  • @jamilia the `ExecutionQueue` processes the queue sequentially. When a task is finished, it immediately runs the next one. The only way to prevent the next from running is to delay the completion of the previous one. So you just need to include everything you want to happen exclusively inside the same `Func` or `Action`, and pass it to `ExecutionQueue.Run`. – Theodor Zoulias Apr 09 '20 at 17:42
  • Is it true only if a task is finished that it runs next one ? We are running them asynchronously, correct? So, as soon as first task in queue is started to process, the next task in queue will start processing without waiting for the previous task to complete, correct? – jamilia Apr 09 '20 at 17:46
  • @jamilia no. Look at the `ProcessQueueAsync`. There is an `await` in there: `foreach (var value in _queue.GetConsumingEnumerable()) await value();`. Each queued task is created, then awaited, and then the loop continues. – Theodor Zoulias Apr 09 '20 at 17:52
  • oh ok. yes, the whole idea to queue them is to run them in order but run asynchronously. i got confused. so, we run in separate consumer thread which will run all tasks in queue making it async. – jamilia Apr 09 '20 at 19:04
  • @jamilia what makes it async is that the `Run` method returns a `Task`, and returns it immediately without blocking the current thread. You can then `await` this task, which is an inherently asynchronous operation since it doesn't block the current thread either. But even if you don't `await` it and instead you issue another `Run` command, the `ExecutionQueue` class will ensure that the next command will not start before the completion of the previous one. – Theodor Zoulias Apr 09 '20 at 19:14
  • Thanks again. Any idea, how i can handle the errors? as i understand with async code, the exceptions can get swallowed and it will be very hard for developers to identify the cause. i also searched for nunit test but couldn't find one. can you point me out on how i can handle them or any article that will give me good start. – jamilia Apr 10 '20 at 01:20
  • @jamilia just `await` the task returned by `Run`, and any exception will be surfaced. If you don't want to `await` the task for some reason, it means that probably you are not interested if it failed or not. – Theodor Zoulias Apr 10 '20 at 01:45
  • i can't use 'await task' as my method is not async. Although i dont want to wait for any results, i would like to know if there was any exceptions and log them. if any of the DBCall or any statement that is queued throws exception, i may not know the underlying cause and all of the tasks lined up in the queue also are not processed. i would like to address this but i would like to know what are the best possible ways. – jamilia Apr 10 '20 at 06:22
  • @jamilia for logging the exceptions you can add logging code in the `Run` method, inside the `catch (Exception ex)` block. – Theodor Zoulias Apr 10 '20 at 10:35
  • I see, but that doesn't get propagated to the calling main thread, correct? Although, in my use case it would be sufficient to capture the exception in the Run method, i would like to know if there is a way I can propagate them to main thread if it needs to take effect at any point. is there a context that i can set here? if so, how is that done? – jamilia Apr 11 '20 at 20:37
  • @jamilia I updated my answer with a mechanism that propagates error notifications to the UI thread. – Theodor Zoulias Apr 11 '20 at 22:06
  • Should the SynchronizationContext be grabbed from the main thread or in the 2nd thread? From the updated code, i don't see how the error is notified to main thread. Should the 'Error' event be created in the main thread to handle the exception? I'm using a wpf app and not console app. – jamilia Apr 13 '20 at 06:21
  • how and where should we create subscriber for 'Error' so my handler is initialized? – jamilia Apr 13 '20 at 06:28
  • @jamilia yeap, the context should be captured from the main thread (the UI thread), because this is the only thread that has installed a `SynchronizationContext`. I suggest that you instantiate the `ExecutionQueue` in the main `Window` constructor, or in the `Application_Startup` event. In order to get the notifications you must also subscribe to the event `Error` from somewhere, from the main `Window` constructor for example. I omitted that part from the answer, because I don't know what you want to do with the notifications. – Theodor Zoulias Apr 13 '20 at 06:59