0

The work I'm trying to schedule is using the await keyword, hence the work itself is of type async Task.

I can't use Task.Factory.StartNew, even when I pass my scheduler, because it will consider the task "done" when it hits the first await of the work, making it leave the scheduler and continuing on the default scheduler.

I can't use var a = new Task(DoWork....); a.Start(myScheduler), because I'd be using an async void method, and we know the pitfalls to that.

I can't use Task.Run(), because it doesn't allow you to change the scheduler on which the task will be scheduled to launch.

If I can't use any of that, how will I achieve my goal, which is to schedule my async Work, which has to complete execution on the scheduler it was initially started on?

SpiritBob
  • 2,355
  • 3
  • 24
  • 62
  • It is not possible to throttle async operations with the `TaskScheduler` class. You can see an experiment [here](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await/57702536#57702536), where I tried (and failed) to do just that. To throttle async operations you need a `SemaphoreSlim`, or to create the desired number of worker tasks to process your workload, or use the TPL Dataflow library that is async-friendly and allows to define the level of concurrency by setting the `MaxDegreeOfParallelism` option. – Theodor Zoulias Mar 09 '20 at 14:38
  • @TheodorZoulias when you say "create desired number of worker tasks" you mean long-running tasks which will do the work as it comes? Is there an example of Dataflow library usage, with `MaxDegreeOfParallelism`? First time hearing about it. – SpiritBob Mar 09 '20 at 14:44
  • 1
    Yes, long running tasks that each one creates and awaits one async operation at a time. This is how the TPL Dataflow library works internally (I think). You can see an example of this approach [here](https://stackoverflow.com/a/56862796/11178549). For an example of using the TPL Dataflow library with async code and `MaxDegreeOfParallelism` look [here](https://stackoverflow.com/a/59574072/11178549). – Theodor Zoulias Mar 09 '20 at 14:52
  • 1
    @TheodorZoulias That's so cool! I've decided to go with the semaphoreSlim approach, although I think performance would be slightly better if I used the action block, simply because I won't be starting X tasks, which would asynchronously wait for the semaphore to be released, versus starting a single task (if `MaxDegreeOfParallelism` = 1) and when that task completes - start the next one? Is that how it works? Any idea if the next started task will be based on `TaskCreationOptions.PreferFairness`? Meaning the sooner it was scheduled, the more likely for it to be executed? – SpiritBob Mar 09 '20 at 15:13
  • 1
    Creating a task with `SemaphoreSlim.WaitAsync` has minimal overhead, and you shouldn't observe any noticeable difference unless your workload has several million items. About the option `TaskCreationOptions.PreferFairness` I have never used it personally. I wouldn't expect it to make any difference though, but you could experiment with it if squeezing some milliseconds is important in your case. :-) – Theodor Zoulias Mar 09 '20 at 15:21
  • @TheodorZoulias is it a problem if I have an `ActionBlock<>` that will live throughout the whole application's lifespan, queuing/ingesting constantly work which is being handled when appropriate? Is `ActionBlock<>` made in mind with these things, or will I be abusing a feature? (Long-lived instance, never calling `ActionBlock<>.Complete()` ) – SpiritBob May 22 '20 at 08:37
  • 1
    No, there should be no problem with using an `ActionBlock` for years. The Dataflow blocks are not known to leak resources. The only consideration is the size of its input queue. If you let it unbounded, and it constantly receives more messages than it can process, the queue will grow larger and larger, and will eventually explode with an `OutOfMemory` exception. For this reason it is a good idea to use the `BoundedCapacity` option when you create the block. If you use this option, be careful to feed the block with `SendAsync` instead of `Post`, otherwise you may lose messages. – Theodor Zoulias May 22 '20 at 14:19
  • @TheodorZoulias I've opened a [question](https://stackoverflow.com/questions/62016856/avoiding-use-of-actionblocktinput-post-when-postdataflowblockoptions-boundedca) in regards to your last comment. Feel free to answer there. I'd love to reward some bounty points for all the helpful information you've covered here! – SpiritBob May 26 '20 at 07:36
  • OK, I'll take a look at that question later. :-) – Theodor Zoulias May 26 '20 at 09:02

1 Answers1

3

First, be aware that the TaskScheduler only applies to executing tasks. So when using an async delegate, the TaskScheduler will only be used in-between await points; when the async code is (asynchronously) waiting in an await, then there is no code "in" that scheduler. For some schedulers this isn't a problem, but it is a problem with schedulers like ConcurrentExclusiveSchedulerPair because the code is not "in" the scheduler during an await.

If your TaskScheduler will work as expected with async code, then you can create a task running on it by calling Task.Factory.StartNew and passing your TaskScheduler. This will return a Task<Task> or Task<Task<T>> since StartNew doesn't understand async code; you can call Unwrap() on that value to get a "normal" asynchronous Task/Task<T>.

Personally, I prefer creating your own TaskFactory instance with your own TaskScheduler (and other options), and then call that StartNew with Unwrap. E.g:

var factory = new TaskFactory(CancellationToken.None, TaskCreationOptions.DenyChildAttach,
    TaskContinuationOptions.DenyChildAttach, myTaskScheduler);
var task = factory.StartNew(() => MyCodeAsync()).Unwrap();

If desired, I have written some Run overloads for TaskFactory that make the TaskFactory usage look more like Task.Run:

// Equivalent to the above code
var factory = new TaskFactory(CancellationToken.None, TaskCreationOptions.DenyChildAttach,
    TaskContinuationOptions.DenyChildAttach, myTaskScheduler);
var task = factory.Run(() => MyCodeAsync());
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • The `TaskScheduler` I'm using is indeed coming from `ConcurrentExclusiveSchedulerPair`. Is there a possible work-around to the issues you've outlined, or my only alternative is to somehow avoid all the `awaits` inside? It's an honor to have you answer my question. – SpiritBob Mar 09 '20 at 14:06
  • 1
    I'd say the best solution is to replace the entire `TaskScheduler` with appropriately-placed `SemaphoreSlim` locks within the code. If that's too much work, then you'd need to avoid `async`/`await` in any code running on that `TaskScheduler`. – Stephen Cleary Mar 09 '20 at 14:19