I am creating a generic helper class that will help prioritise requests made to an API whilst restricting parallelisation at which they occur.
Consider the key method of the application below;
public IQueuedTaskHandle<TResponse> InvokeRequest<TResponse>(Func<TClient, Task<TResponse>> invocation, QueuedClientPriority priority, CancellationToken ct) where TResponse : IServiceResponse
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
_logger.Debug("Queueing task.");
var taskToQueue = Task.Factory.StartNew(async () =>
{
_logger.Debug("Starting request {0}", Task.CurrentId);
return await invocation(_client);
}, cts.Token, TaskCreationOptions.None, _schedulers[priority]).Unwrap();
taskToQueue.ContinueWith(task => _logger.Debug("Finished task {0}", task.Id), cts.Token);
return new EcosystemQueuedTaskHandle<TResponse>(cts, priority, taskToQueue);
}
Without going into too many details, I want to invoke tasks returned by Task<TResponse>> invocation
when their turn in the queue arises. I am using a collection of queues constructed using QueuedTaskScheduler
indexed by a unique enumeration;
_queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 3);
_schedulers = new Dictionary<QueuedClientPriority, TaskScheduler>();
//Enumerate the priorities
foreach (var priority in Enum.GetValues(typeof(QueuedClientPriority)))
{
_schedulers.Add((QueuedClientPriority)priority, _queuedTaskScheduler.ActivateNewQueue((int)priority));
}
However, with little success I can't get the tasks to execute in a limited parallelised environment, leading to 100 API requests being constructed, fired, and completed in one big batch. I can tell this using a Fiddler session;
I have read some interesting articles and SO posts (here, here and here) that I thought would detail how to go about this, but so far I have not been able to figure it out. From what I understand, the async
nature of the lambda is working in a continuation structure as designed, which is marking the generated task as complete, basically "insta-completing" it. This means that whilst the queues are working fine, runing a generated Task<T>
on a custom scheduler is turning out to be the problem.