1

What I have

I have a set of asynchronous processing methods, similar to:

public class AsyncProcessor<T>
{
    //...rest of members, etc.

    public Task Process(T input)
    {
        //Some special processing, most likely inside a Task, so
        //maybe spawn a new Task, etc.
        Task task = Task.Run(/* maybe private method that does the processing*/);
        return task;
    }
}

What I want

I would like to chain them all together, to execute in sequential order.

What I tried

I have tried to do the following:

public class CompositeAsyncProcessor<T>
{
    private readonly IEnumerable<AsyncProcessor<T>> m_processors;

    //Constructor receives the IEnumerable<AsyncProcessor<T>> and
    //stores it in the field above.

    public Task ProcessInput(T input)
    {
        Task chainedTask = Task.CompletedTask;

        foreach (AsyncProcessor<T> processor in m_processors)
        {
            chainedTask = chainedTask.ContinueWith(t => processor.Process(input));
        }

        return chainedTask;
    }
}

What went wrong

However, tasks do not run in order because, from what I have understood, inside the call to ContinueWith, the processor.Process(input) call is performed immediately and the method returns independently of the status of the returned task. Therefore, all processing Tasks still begin almost simultaneously.

My question

My question is whether there is something elegant that I can do to chain the tasks in order (i.e. without execution overlap). Could I achieve this using the following statement, (I am struggling a bit with the details), for example?

chainedTask = chainedTask.ContinueWith(async t => await processor.Process(input));

Also, how would I do this without using async/await, only ContinueWith?

Why would I want to do this?

Because my Processor objects have access to, and request things from "thread-unsafe" resources. Also, I cannot just await all the methods because I have no idea about how many they are, so I cannot just write down the necessary lines of code.

What do I mean by thread-unsafe? A specific problem

Because I may be using the term incorrectly, an illustration is a bit better to explain this bit. Among the "resources" used by my Processor objects, all of them have access to an object such as the following:

public interface IRepository
{
    void Add(object obj);

    bool Remove(object obj);

    IEnumerable<object> Items { get; }
}

The implementation currently used is relatively naive. So some Processor objects add things, while others retrieve the Items for inspection. Naturally, one of the exceptions I get all too often is:

InvalidOperationException: Collection was modified, enumeration operation may not execute.

I could spend some time locking access and pre-running the enumerations. However, this was the second option I would get down to, while my first thought was to just make the processes run sequentially.

Why must I use Tasks?

While I have full control in this case, I could say that for the purposes of the question, I might not be able to change the base implementation, so what would happen if I were stuck with Tasks? Furthermore, the operations actually do represent relatively time-consuming CPU-bound operations plus I am trying to achieve a responsive user interface so I needed to unload some burden to asynchronous operations. While being useful and, in most of my use-cases, not having the necessity to chain multiple of them, rather a single one each time (or a couple, but always specific and of a specific count, so I was able to hook them together without iterations and async/await), one of the use-cases finally necessitated chaining an unknown number of Tasks together.

How I deal with this currently

The way I am dealing with this currently is to append a call to Wait() inside the ContinueWith call, i.e.:

foreach (AsyncProcessor<T> processor in m_processors)
{
    chainedTask = chainedTask.ContinueWith(t => processor.Process(input).Wait());
}

I would appreciate any idea on how I should do this, or how I could do it more elegantly (or, "async-properly", so to speak). Also, I would like to know how I can do this without async/await.

Why my question is different from this question, which did not answer my question entirely.

Because the linked question has two tasks, so the solution is to simply write the two lines required, while I have an arbitrary (and unknown) number of tasks, so I need an suitable iteration. Also, my method is not async. I now understand (from the single briefly available answer, which was deleted) that I could do it fairly easily if I changed my method to async and await each processor's Task method, but I still wish to know how this could be achieved without async/await syntax.

Why my question is not a duplicate of the other linked questions

Because none of them explains how to chain correctly using ContinueWith and I am interested in a solution that utilizes ContinueWith and does not make use of the async/await pattern. I know this pattern may be the preferable solution, I want to understand how to (if possible) make arbitrary chaining using ContinueWith calls properly. I now know I don't need ContinueWith. The question is, how do I do it with ContinueWith?

Vector Sigma
  • 194
  • 1
  • 4
  • 16
  • Part of being a programmer is learning to recognize an abstracted form of identical problems. In your case, the marked duplicates do answer your question, because they explain the usage of `await` and other techniques to ensure that a given `Task` has completed before moving on to the next step of processing. Note that in general, and certainly in your scenario, the use of `ContinueWith()` is superfluous. You are accomplishing nothing that a call to `await` would do just as well. So in your loop, just `await` the task, then call `Process()`. – Peter Duniho Oct 10 '19 at 22:50
  • `ContinueWith` [predates](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.continuewith) the async/await constructs as it was available with C# 4.0 and the .NET Framework 4.0 already (TPL), while async/await were introduced with C# 5.0. I have understood how tasks are actually "awaited" when **await** is a thing (i.e. a language keyword). I also want to know how this was done **without** async/await. How did developers **prefer** to do it back then? – Vector Sigma Oct 10 '19 at 23:18
  • I wouldn't say we "preferred" to do it at all. It was such a pain without language support. There were lots of different mechanisms, all involving inter-thread communication via synchronization objects like `Monitor`, `SemaphoreSlim`, wait handles, etc. Maybe I've misunderstood and your question is not in fact asking "how to" but instead is asking for a historical dissertation. Unfortunately, the latter would be unsuitable for Stack Overflow as "too broad". The marked duplicates are at least a better fit for Stack Overflow in terms of the content keeping an appropriate focus. – Peter Duniho Oct 10 '19 at 23:29
  • I understand. Maybe the `Wait()`-based solution would have been enough on its own, to produce the desired result. I have researched a bit and the only alternative I could come up with, relying solely on `ContinueWith`, would be to ContinueWith (i.e. return the inner Task, in my code), then `Unwrap()` the total Task and then chain onto **that** Task, looping over. An important thing I have come to learn in the process, though, is that the **async** keyword does not make any difference to _consumers_. As long as a method returns a Task, it can be awaited in some other async method. – Vector Sigma Oct 10 '19 at 23:37
  • How is async/await related with requesting things from "thread-unsafe" resources? How does replacing `await` with `ContinueWith` affect thread-safety? – Theodor Zoulias Oct 10 '19 at 23:51
  • @Vector: in the code you provided, you never use `t` in the `ContinueWith()`. And it's not a [mcve] so it's impossible to know exactly what the code is doing. But, it _looks_ like you are simply applying the `Process()` method for each "processor" object to the `input`. As such, a simple loop that has an `await processor.Process(input);` _seems_ like it should suffice for your needs. There's no need to literally create a `Task` object that represents a "chained list" of the tasks, because that will manifest implicitly once you make the method `async` and simplify your loop as suggested. – Peter Duniho Oct 11 '19 at 00:02
  • (I'll note that this suggestion is essentially the same provided in the now-deleted answer from Alexei. You seem to have gotten confused about the name `RunSequentially` -- the method name isn't special, it's just what he called the helper method he included in his answer to show how you might implement the core of your `ProcessInput()` method) – Peter Duniho Oct 11 '19 at 00:09
  • @PeterDuniho Yes. I was mostly interested in chaining Tasks using `ContinueWith`. It's probably by returning a new Task and unwrapping it to use `ContinueWith` on it, too. – Vector Sigma Oct 11 '19 at 00:15
  • @TheodorZoulias It does not affect thread-safety. All I need is for the Tasks to not run in parallel, because they use resources in their code, which are not designed to concurrently serve the requests they are designed to serve, so things can get messed up. I used await to work **around** thread-unsafety. Maybe thread-safety is not the right statement. In any case, the resources are not designed to serve **concurrent** requests. Also, replacing `await` with `ContinueWith` does not affect this thread-safety. I am not looking to **replace** anything, I am just interested in the **alternative**. – Vector Sigma Oct 11 '19 at 00:21
  • _"I was mostly interested in chaining Tasks using ContinueWith"_ -- why? ignoring for a moment that your question mostly does _not_ seem to be asking that (the `ContinueWith()` seems at best to be a secondary question, one that just increases the broadness of the question), what would be the point of that? Using `await` does the exact same thing, but with compiler support (and thus is more "elegant", to quote your stated goal). – Peter Duniho Oct 11 '19 at 00:23
  • 1
    Well, yes, I did not know what I now (think) I know: making a Task returning method **async** makes no actual difference to _consumers_... it simply helps one use await, which makes one's life easier. So I was not prepared to change the method to use async and, as a result, I was looking for the alternative. Since making a method async makes no difference to the "outer world", I will probably use that for now. – Vector Sigma Oct 11 '19 at 00:39
  • Okay. FWIW, a simple (but IMHO inelegant) way to use `ContinueWith()` would be to add a call to `Unwrap()`. I.e. `chainedTask = chainedTask.ContinueWith(t => processor.Process(input)).Unwrap()`; This has the effect of ignoring the original `ContinueWith()` task, which of course completes immediately, and chaining only the inner task that was produced. Using `await` is better, but I guess that would be an answer to your secondary question. That's also a duplicate so I'll add a couple of matching ones to the list. – Peter Duniho Oct 11 '19 at 00:44
  • Thank you very much. Yes, that is almost like what I was assuming. It is not very easy to "digest" things asynchronous/parallel, such as, for example, what "Unwrap" exactly does, even after reading a lot about it. The whole concept of Tasks, asynchronous programming etc. is fairly complicated in and of itself and, sometimes, significant insight might be found even in the tiniest of phrases or explanations on the subject. An explanation to the precise **bit** that bothers one's understanding is not that easy to "unwrap", but I think I am getting this now. – Vector Sigma Oct 11 '19 at 01:41
  • I think [this](https://stackoverflow.com/questions/11783277/what-is-the-proper-way-to-chain-tasks-when-returning-a-task) also somehow answers some important parts of my question. – Vector Sigma Oct 11 '19 at 01:43
  • This answer was (obviously) closed as a duplicate initially, but it was reopened. From the exchange above, I (and others) can clearly understand why it was closed as a duplicate. Could someone (among those voting to re-open it) let me know their reasons for re-opening? I am glad it was re-opened, of course, but I would not like to base that on my own _subjective_ opinion of this being a fair question that is **really** not a(n exact) duplicate. So... is this an OK question? – Vector Sigma Oct 11 '19 at 20:25
  • I have my doubts that accessing a variable from multiple threads non-concurrently is 100% thread safe. I have asked [a question about it](https://stackoverflow.com/questions/58264302/asynchronous-code-shared-variables-thread-pool-threads-and-thread-safety), and the feedback I got was to stop worrying about it, but I still do. So in your case I would still lock around every access to the `IRepository` object, just to preserve my peace of mind. Btw I voted "Reopen" simply because I wanted to answer this question. :-) – Theodor Zoulias Oct 11 '19 at 21:17
  • 1
    Btw if I was in your shoes I wouldn't invest much time optimizing the `CompositeAsyncProcessor` class, because there are already libraries available that can do what it does, and much more, and are highly performant, extensively tested and actively maintained. I have in mind the [TPL Dataflow](https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library), and in particular the [`ActionBlock`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1) class. Studying this library could be a better investment of your time. – Theodor Zoulias Oct 11 '19 at 21:30
  • 1
    That's interesting! Thank you for the information, I will take a look into it. I will almost definitely **not** use the `ContinueWith` methodology and turn to async/await. However, this question and the answers (including yours) have helped me clarify a few things I was not entirely certain about, so it has been helpful despite the "inapplicability" of the matter at hand. – Vector Sigma Oct 11 '19 at 22:04

2 Answers2

2

foreach + await will run Processes sequentially.

    public async Task ProcessInputAsync(T input)
    {
        foreach (var processor in m_processors)
        {
            await processor.Process(input));
        }
    }

Btw. Process, should be called ProcessAsync

tymtam
  • 31,798
  • 8
  • 86
  • 126
1

The method Task.ContinueWith does not understand async delegates, like Task.Run do, so when you return a Task it considers this as a normal return value and wraps it in another Task. So you end up receiving a Task<Task> instead of what you expected to get. The problem would be obvious if the AsyncProcessor.Process was returning a generic Task<T>. In this case you would get a compile error because of the illegal casting from Task<Task<T>> to Task<T>. In your case you cast from Task<Task> to Task, which is legal, since Task<TResult> derives from Task.

Solving the problem is easy. You just need to unwrap the Task<Task> to a simple Task, and there is a built-in method Unwrap that does exactly that.

There is another problem that you need to solve though. Currently your code suppresses all exceptions that may occur on each individual AsyncProcessor.Process, which I don't think it was intended. So you must decide which strategy to follow in this case. Are you going to propagate the first exception immediately, or you prefer to cache them all and propagate them at the end bundled in an AggregateException, like the Task.WhenAll does? The example bellow implements the first strategy.

public class CompositeAsyncProcessor<T>
{
    //...
    public Task Process(T input)
    {
        Task current = Task.CompletedTask;
        foreach (AsyncProcessor<T> processor in m_processors)
        {
            current = current.ContinueWith(antecessor =>
            {
                if (antecessor.IsFaulted)
                    return Task.FromException<T>(antecessor.Exception.InnerException);
                return processor.Process(input);
            },
                CancellationToken.None,
                TaskContinuationOptions.ExecuteSynchronously,
                TaskScheduler.Default
            ).Unwrap();
        }
        return current;
    }
}

I have used an overload of ContinueWith that allows configuring all the options, because the defaults are not ideal. The default TaskContinuationOptions is None. Configuring it to ExecuteSynchronously you minimize the thread switches, since each continuation will run in the same thread that completed the previous one.

The default task scheduler is TaskScheduler.Current. By specifying TaskScheduler.Default you make it explicit that you want the continuations to run in thread-pool threads (for some exceptional cases that won't be able to run synchronously). The TaskScheduler.Current is context specific, and if it ever surprises you it won't be in a good way.

As you see there are a lot of gotchas with the old-school ContinueWith approach. Using the modern await in a loop is a lot easier to implement, and a lot more difficult to get it wrong.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104