16

I've got the following scenario, which I think might be quite common:

  1. There is a task (a UI command handler) which can complete either synchronously or asynchronously.

  2. Commands may arrive faster than they are getting processed.

  3. If there is already a pending task for a command, the new command handler task should be queued and processed sequentially.

  4. Each new task's result may depend on the result of the previous task.

Cancellation should be observed, but I'd like to leave it outside the scope of this question for simplicity. Also, thread-safety (concurrency) is not a requirement, but re-entrancy must be supported.

Here's a basic example of what I'm trying to achieve (as a console app, for simplicity):

using System;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var asyncOp = new AsyncOp<int>();

            Func<int, Task<int>> handleAsync = async (arg) =>
            {
                Console.WriteLine("this task arg: " + arg);

                //await Task.Delay(arg); // make it async

                return await Task.FromResult(arg); // sync
            };

            Console.WriteLine("Test #1...");
            asyncOp.RunAsync(() => handleAsync(1000));
            asyncOp.RunAsync(() => handleAsync(900));
            asyncOp.RunAsync(() => handleAsync(800));
            asyncOp.CurrentTask.Wait();

            Console.WriteLine("\nPress any key to continue to test #2...");
            Console.ReadLine();

            asyncOp.RunAsync(() =>
            {
                asyncOp.RunAsync(() => handleAsync(200));
                return handleAsync(100);
            });

            asyncOp.CurrentTask.Wait();
            Console.WriteLine("\nPress any key to exit...");
            Console.ReadLine();
        }

        // AsyncOp
        class AsyncOp<T>
        {
            Task<T> _pending = Task.FromResult(default(T));

            public Task<T> CurrentTask { get { return _pending; } }

            public Task<T> RunAsync(Func<Task<T>> handler)
            {
                var pending = _pending;
                Func<Task<T>> wrapper = async () =>
                {
                    // await the prev task
                    var prevResult = await pending;
                    Console.WriteLine("\nprev task result:  " + prevResult);
                    // start and await the handler
                    return await handler();
                };

                _pending = wrapper();
                return _pending;
            }
        }

    }
}

The output:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800

prev task result:  800
this task arg: 200
this task arg: 100

Press any key to exit...

It works in accordance with the requirements, until re-entrancy is introduced in test #2:

asyncOp.RunAsync(() =>
{
    asyncOp.RunAsync(() => handleAsync(200));
    return handleAsync(100);
});

The desired output should be 100, 200, rather than 200, 100, because there's already a pending outer task for 100. That's obviously because the inner task executes synchronously, breaking the logic var pending = _pending; /* ... */ _pending = wrapper() for the outer task.

How to make it work for test #2, too?

One solution would be to enforce asynchrony for every task, with Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext(). However, I don't want to impose asynchronous execution upon the command handlers which might be synchronous internally. Also, I don't want to depend on the behavior of any particular synchronization context (i.e. relying upon that Task.Factory.StartNew should return before the created task has been actually started).

In the real-life project, I'm responsible for what AsyncOp is above, but have no control over the command handlers (i.e., whatever is inside handleAsync).

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • 2
    I've been noticing your name a lot lately on a lot of good questions and answers, it is good to have new quality users coming to the site. – Scott Chamberlain Jan 29 '14 at 07:42
  • 1
    I know you already answered yourself, but this seems like a great place to use Rx. Also, what happens in your code if there is a 3rd call during execution of the first one? With Rx you could throthle the calls in one line, or even aggregate them batch the execution in second line. Also, cancelation is very easy with Rx – Krzysztof Skowronek Mar 13 '19 at 11:27

3 Answers3

14

I almost forgot it's possible to construct a Task manually, without starting or scheduling it. Then, "Task.Factory.StartNew" vs "new Task(...).Start" put me back on track. I think this is one of those few cases when the Task<TResult> constructor may actually be useful, along with nested tasks (Task<Task<T>>) and Task.Unwrap():

// AsyncOp
class AsyncOp<T>
{
    Task<T> _pending = Task.FromResult(default(T));

    public Task<T> CurrentTask { get { return _pending; } }

    public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
    {
        var pending = _pending;
        Func<Task<T>> wrapper = async () =>
        {
            // await the prev task
            var prevResult = await pending;
            Console.WriteLine("\nprev task result:  " + prevResult);
            // start and await the handler
            return await handler();
        };

        var task = new Task<Task<T>>(wrapper);
        var inner = task.Unwrap();
        _pending = inner;

        task.RunSynchronously(useSynchronizationContext ?
            TaskScheduler.FromCurrentSynchronizationContext() :
            TaskScheduler.Current);

        return inner;
    }
}

The output:

Test #1...

prev task result:  0
this task arg: 1000

prev task result:  1000
this task arg: 900

prev task result:  900
this task arg: 800

Press any key to continue to test #2...


prev task result:  800
this task arg: 100

prev task result:  100
this task arg: 200

It's now also very easy to make AsyncOp thread-safe by adding a lock to protect _pending, if needed.

Updated, this has been further improved with cancel/restart logic.

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • 1
    `Task` constructors for the win! :-) – Theodor Zoulias Jul 13 '20 at 06:51
  • @TheodorZoulias, I was proud of it until Enigmativity posted his Rx version [here](https://stackoverflow.com/a/62868361/1768303) – noseratio Jul 13 '20 at 06:59
  • 1
    Nahh, your solution is better IMHO. Rx's learning curve is too steep, and it's semantics are too delicate. For example Enigmativity routinely provides examples with `Subscribe` lacking the `onError` handler, which will cause the process to crash if anything goes wrong. – Theodor Zoulias Jul 13 '20 at 07:14
  • 1
    @Tks :) That's exactly how I've always felt about Rx. It's super-cool but it's like a complete shift in how one should perceive asynchronous workflows. I once tried pushing it for a front-end project as an opportune to learn it more, and the fellow developers decided against that :) – noseratio Jul 13 '20 at 07:56
  • 1
    Recently I read a document called [Rx Design Guidelines](https://go.microsoft.com/fwlink/?LinkID=205219) (downloadable PDF) that contains some very basic stuff regarding the behavior of built-in Rx observables, and about how custom observables are expected to behave. There is an implicit contract in Rx, that is only partially imposed by the API. I wish I had read it earlier. It would have saved me from including redundant synchronization code in various custom implementations. – Theodor Zoulias Jul 13 '20 at 08:18
  • 1
    @TheodorZoulias, this paper is golden, I believe the minds behind it are Erik Meyer and Bart De Smet. – noseratio Jul 13 '20 at 08:36
  • 1
    Erik Meijer is a cool guy! I just watched him explaining Rx in a [15 min](https://channel9.msdn.com/Blogs/Charles/Erik-Meijer-Rx-in-15-Minutes) video. – Theodor Zoulias Jul 13 '20 at 11:23
1

Here is a solution that is worse on every aspect compared to the accepted answer, except from being thread-safe (which is not a requirement of the question). Disadvantages:

  1. All lambdas are executed asynchronously (there is no fast path).
  2. The executeOnCurrentContext configuration effects all lambdas (it's not a per-lambda configuration).

This solution uses as processing engine an ActionBlock from the TPL Dataflow library.

public class AsyncOp<T>
{
    private readonly ActionBlock<Task<Task<T>>> _actionBlock;

    public AsyncOp(bool executeOnCurrentContext = false)
    {
        var options = new ExecutionDataflowBlockOptions();
        if (executeOnCurrentContext)
            options.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

        _actionBlock = new ActionBlock<Task<Task<T>>>(async taskTask =>
        {
            try
            {
                taskTask.RunSynchronously();
                await await taskTask;
            }
            catch { } // Ignore exceptions
        }, options);
    }

    public Task<T> RunAsync(Func<Task<T>> taskFactory)
    {
        var taskTask = new Task<Task<T>>(taskFactory);
        if (!_actionBlock.Post(taskTask))
            throw new InvalidOperationException("Not accepted"); // Should never happen
        return taskTask.Unwrap();
    }
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Very interesting! Could you extended it with cancellation? I've found that in practice I almost always wanted to cancel the previous pending op, before starting a new instance of it, like something I did [there](https://stackoverflow.com/a/21639075/1768303). – noseratio Jul 13 '20 at 22:52
  • 1
    @noseratio making a thread-safe cancelable operation is inherently tricky, because the `Dispose` method of the `CancellationTokenSource` class is not thread-safe, and the documentation insists strongly that the class must be disposed. I have attempted an implementation [here](https://stackoverflow.com/questions/6960520/when-to-dispose-cancellationtokensource/61681938#61681938), which has an awful lot of code for such a simple requirement. I am seeing now that [your own attempt](https://stackoverflow.com/a/21639075/1768303) has quite a lot of code too. :-) – Theodor Zoulias Jul 14 '20 at 00:02
1

Microsoft's Rx does provide an easy way to do this kind of thing. Here's a simple (perhaps overly simple) way of doing it:

var subject = new BehaviorSubject<int>(0);

IDisposable subscription =
    subject
        .Scan((x0, x1) =>
        {
            Console.WriteLine($"previous value {x0}");
            return x1;
        })
        .Skip(1)
        .Subscribe(x => Console.WriteLine($"current value {x}\r\n"));

subject.OnNext(1000);
subject.OnNext(900);
subject.OnNext(800);

Console.WriteLine("\r\nPress any key to continue to test #2...\r\n");
Console.ReadLine();

subject.OnNext(200);
subject.OnNext(100);

Console.WriteLine("\r\nPress any key to exit...");
Console.ReadLine();

The output I get is this:

previous value 0
current value 1000

previous value 1000
current value 900

previous value 900
current value 800


Press any key to continue to test #2...

previous value 800
current value 200

previous value 200
current value 100


Press any key to exit...

It's easy to cancel at any time by calling subscription.Dispose().


Error handling in Rx is generally a little more bespoke than normal. It's not just a matter of throwing a try/catch around things. You also can repeat steps that error with a Retry operator in the case of things like IO errors.

In this circumstance, because I've used a BehaviorSubject (which repeats its last value whenever it is subscribed to) you can easily just resubscribe using a Catch operator.

var subject = new BehaviorSubject<int>(0);
var random = new Random();

IDisposable subscription =
    subject
        .Select(x =>
        {
            if (random.Next(10) == 0)
                throw new Exception();
            return x;
        })
        .Catch<int, Exception>(ex => subject.Select(x => -x))
        .Scan((x0, x1) =>
        {
            Console.WriteLine($"previous value {x0}");
            return x1;
        })
        .Skip(1)
        .Subscribe(x => Console.WriteLine($"current value {x}\r\n"));

Now with the .Catch<int, Exception>(ex => subject.Select(x => -x)) it inverts the value of the query should an exception be raised.

A typical output may be like this:

previous value 0
current value 1000

previous value 1000
current value 900

previous value 900
current value 800


Press any key to continue to test #2...

previous value 800
current value -200

previous value -200
current value -100


Press any key to exit...

Note the -ve numbers in the second half. An exception was handled and the query was able to continue.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172