2

I am creating a generic helper class that will help prioritise requests made to an API whilst restricting parallelisation at which they occur.

Consider the key method of the application below;

    public IQueuedTaskHandle<TResponse> InvokeRequest<TResponse>(Func<TClient, Task<TResponse>> invocation, QueuedClientPriority priority, CancellationToken ct) where TResponse : IServiceResponse
    {
        var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
        _logger.Debug("Queueing task.");
        var taskToQueue = Task.Factory.StartNew(async () =>
        {
            _logger.Debug("Starting  request {0}", Task.CurrentId);
            return await invocation(_client);
        }, cts.Token, TaskCreationOptions.None, _schedulers[priority]).Unwrap();
        taskToQueue.ContinueWith(task => _logger.Debug("Finished task {0}", task.Id), cts.Token);
        return new EcosystemQueuedTaskHandle<TResponse>(cts, priority, taskToQueue);
    }

Without going into too many details, I want to invoke tasks returned by Task<TResponse>> invocation when their turn in the queue arises. I am using a collection of queues constructed using QueuedTaskScheduler indexed by a unique enumeration;

        _queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 3);
        _schedulers = new Dictionary<QueuedClientPriority, TaskScheduler>();
        //Enumerate the priorities
        foreach (var priority in Enum.GetValues(typeof(QueuedClientPriority)))
        {
            _schedulers.Add((QueuedClientPriority)priority, _queuedTaskScheduler.ActivateNewQueue((int)priority));
        }

However, with little success I can't get the tasks to execute in a limited parallelised environment, leading to 100 API requests being constructed, fired, and completed in one big batch. I can tell this using a Fiddler session;

enter image description here

I have read some interesting articles and SO posts (here, here and here) that I thought would detail how to go about this, but so far I have not been able to figure it out. From what I understand, the async nature of the lambda is working in a continuation structure as designed, which is marking the generated task as complete, basically "insta-completing" it. This means that whilst the queues are working fine, runing a generated Task<T> on a custom scheduler is turning out to be the problem.

Community
  • 1
  • 1
Daniel Park
  • 3,903
  • 4
  • 23
  • 38
  • If you're trying to handle synchronization, why not use synchronization primitives? Just wrap the request in a `SemaphoreSlim` (initialized to whatever level of parallelism you want to achieve) and you're done :) It even has async API, fortunately, so it's as simple as `try { await semaphore.WaitAsync(); ... } finally { semaphore.Release(); }`. – Luaan Jun 15 '15 at 12:33
  • @Luaan What about priorities? – svick Jun 16 '15 at 16:07
  • @svick Depends on how you want to handle it. You could simply have a separate semaphore for each priority, or you could have blocking collections that would be read by a single consumre that would handle the throttling. – Luaan Jun 16 '15 at 20:11

2 Answers2

3

This means that whilst the queues are working fine, runing a generated Task on a custom scheduler is turning out to be the problem.

Correct. One way to think about it[1] is that an async method is split into several tasks - it's broken up at each await point. Each one of these "sub-tasks" are then run on the task scheduler. So, the async method will run entirely on the task scheduler (assuming you don't use ConfigureAwait(false)), but at each await it will leave the task scheduler, and then re-enter that task scheduler after the await completes.

So, if you want to coordinate asynchronous work at a higher level, you need to take a different approach. It's possible to write the code yourself for this, but it can get messy. I recommend you first try ActionBlock<T> from the TPL Dataflow library, passing your custom task scheduler to its ExecutionDataflowBlockOptions.

[1] This is a simplification. The state machine will avoid creating actual task objects unless necessary (in this case, they are necessary because they're being scheduled to a task scheduler). Also, only await points where the awaitable isn't complete actually cause a "method split".

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Does the entire async method really run on the designated scheduler? Continuations could be called anywhere, usually on the thread pool. I think at the first split point it will leave the scheduler forever. – usr Jun 15 '15 at 12:58
  • The context captured by `await` is the current `SynchronizationContext` unless it it `null`, in which case it is the current `TaskScheduler`. So, assuming there's no SyncCtx (and no `ConfigureAwait(false)`, of course), the `async` continuations are guaranteed to resume on the `TaskScheduler`. – Stephen Cleary Jun 15 '15 at 13:30
  • I did not know that TaskAwaitable captures the current task scheduler. – usr Jun 15 '15 at 21:34
  • It's very rarely used. – Stephen Cleary Jun 16 '15 at 00:42
  • As I understand it, TDF is helpful in pipeline applications which I consider this a derivative of. So in essence, I should create a `TransformBlock` for each scheduler in `_schedulers`, then `Post(...)` whenever the generic method wrapper (`InvokeRequest`) is called? I plan to follow the `async`/`await` example as provided [here.](https://msdn.microsoft.com/en-us/library/hh228612(v=vs.110).aspx) – Daniel Park Jun 16 '15 at 01:31
  • I don't have a full picture of your code, but AFAICS a collection of `ActionBlock`s should suffice (one per task scheduler). I don't think `TransformBlock` would be necessary. – Stephen Cleary Jun 16 '15 at 01:41
  • 1
    I'm not sure `ActionBlock`s combined with `TaskScheduler`s will work, because you still won't be able to use those `TaskScheduler`s to limit `async` parallelism. `ExecutionDataflowBlockOptions.MaxDegreeOfParallelism` would work, but it's not obvious to me how to combine that with prioritization. – svick Jun 16 '15 at 15:18
  • @svick: On reflection, I believe you're right. I don't think a `TaskScheduler`-based approach will work at all. – Stephen Cleary Jun 16 '15 at 15:25
  • Excuse my inexperience with TPD, but doesn't an `ActionBlock` run a non-returnable delegate? As these calls are designed to wrap requests made to an API with an external SDK, won't a returnable type be required, like `TransformBlock`? – Daniel Park Jun 16 '15 at 19:43
  • @DanielPark: Depends on how you structure it. E.g., you can create a small "context" class that includes a TaskCompletionSource (where T is your result type) and complete it from the ActionBlock. That will give you an awaitable that represents "completed processing in the Dataflow pipeline". BTW, do check out svick's comment; a different (non-`TaskScheduler`) solution may be necessary. – Stephen Cleary Jun 16 '15 at 21:29
  • I feel the same @StephenCleary, so will probably go that route. Will keep this thread in mind when a solution is made for closure. Additionally, after researching some more into TDF, there are some use cases in our service that could use this, so thanks for getting that ball rolling. – Daniel Park Jun 16 '15 at 21:59
1

Stephen Cleary's answer explains well why you can't use TaskScheduler for this purpose and how you can use ActionBlock to limit the degree of parallelism. But if you want to add priorities to that, I think you'll have to do that manually. Your approach of using a Dictionary of queues is reasonable, a simple implementation (with no support for cancellation or completion) of that could look something like this:

class Scheduler
{
    private static readonly Priority[] Priorities =
        (Priority[])Enum.GetValues(typeof(Priority));

    private readonly IReadOnlyDictionary<Priority, ConcurrentQueue<Func<Task>>> queues;
    private readonly ActionBlock<Func<Task>> executor;
    private readonly SemaphoreSlim semaphore;

    public Scheduler(int degreeOfParallelism)
    {
        queues = Priorities.ToDictionary(
            priority => priority, _ => new ConcurrentQueue<Func<Task>>());

        executor = new ActionBlock<Func<Task>>(
            invocation => invocation(),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = degreeOfParallelism,
                BoundedCapacity = degreeOfParallelism
            });

        semaphore = new SemaphoreSlim(0);

        Task.Run(Watch);
    }

    private async Task Watch()
    {
        while (true)
        {
            await semaphore.WaitAsync();

            // find item with highest priority and send it for execution
            foreach (var priority in Priorities.Reverse())
            {
                Func<Task> invocation;
                if (queues[priority].TryDequeue(out invocation))
                {
                    await executor.SendAsync(invocation);
                }
            }
        }
    }

    public void Invoke(Func<Task> invocation, Priority priority)
    {
        queues[priority].Enqueue(invocation);
        semaphore.Release(1);
    }
}
svick
  • 236,525
  • 50
  • 385
  • 514
  • I'm starting to think the same thing here, as relying on schedulers to do the heavy lifting may be a bit too much of an ask. Currently this code is a part of a lesser priority Improvement to our product, but will be sure to explore both @Stephen and your approach. I will reply in due time with the approach that works most effectively when I get the opportunity to finish it. – Daniel Park Jun 16 '15 at 19:39
  • Marked as answer, as this was the approach I took. I used a queue, and a delegate pattern (no TPLD) to start the tasks. – Daniel Park Oct 27 '15 at 22:32