2

What I'm trying to implement:

Task scheduler that enqueues tasks and runs specified amount in parallel, while others wait in queue to get started. Each task has timeout that starts counting when the task runs and if exceeds that time the task is canceled and throws TimeoutException, which is handled in ContinueWith (or some task that runs immediately after). Tasks should be cancelable by user.

What I get:

When the first task fails all others fail too instantly.

Here is the full code of my Task Scheduler (taken from MSDN with some modifications):

http://pastebin.com/KSMbDTH5. (the function in question is on line 161)

Here is example usage:

var taskTokens = new List<CancellationToken>();
var factory = new TaskScheduleFactory(new ParallelOptions() { MaxDegreeOfParallelism = 1 }); //for the purpose of testing, supposed to work and with higher values
for (var i = 0; i < 10; i++)
{
    //TaskScheduleFactory.cs, line 161
    var taskToken = factory.Add(
        (token) => //Task
        {
            Console.WriteLine("Start");
            Thread.Sleep(5000);
            if (token.IsCancellationRequested) return; //Cancelled by timeout
            Console.WriteLine("This should not print");
        },
        (task) => //ContinueWith
        {
            if (task.IsFaulted)
            {
                Console.WriteLine("Fail");
            }
            else if (!task.IsCompleted)
            {
                Console.WriteLine("Not completed");
            }
            else Console.WriteLine("Done");
        },
        2000 //Timeout
    );
    taskTokens.Add(taskToken);
}

How it is supposed to work: (We force timeout event after 2sec so neither task would complete)

For MaxDegreeOfParallelism = 1:

Start;
(Wait 2sec)
Fail;
Start;
(Wait 2sec)
Fail;
....

For MaxDegreeOfParallelism = 2:

Start;
Start;
(Wait 2sec)
Fail;
Fail;
Start;
Start;
(Wait 2sec)
Fail;
Fail;
....

How it's working:

Start;
(Wait 2sec)
Fail;
Fail;
Fail;
Fail;
...

(for MaxDegreeOfParallelism = 1, the rest are mess too)

Note: These are my first steps with TPL, so excuse any stupidity from my side

svick
  • 236,525
  • 50
  • 385
  • 514
Arvigeus
  • 353
  • 3
  • 17

2 Answers2

2

In Add you call .TimeoutAfter(timeoutInMilliseconds, cts). This means that the timeout starts as soon as you create the task. I think your intention was to only start the timeout when the task starts executing.

Since the first task takes longer than the timeout all other tasks are already timed out when they get their turn.

usr
  • 168,620
  • 35
  • 240
  • 369
  • Good point. I moved TimeoutAfter and ContinueWith into action's body but still struggle to make it work right. Could you elaborate more on how to fix it? – Arvigeus Mar 02 '16 at 16:37
  • You have written some pretty advanced code there. I thought that would be in reach for you to fix :) Something like that should work: `var workTask = Instance.StartNew(() => { cts.CancelAfter(timeoutInMilliseconds); action(cts.Token); }, cts.Token); var resultTask = Task.WhenAll(workTask, cts.Token);`. This might complete the `resultTask` in a state that you don't like (it will end up cancelled I think). If you want more control you can achieve any completion behavior with `TaskCompletionSource`. – usr Mar 02 '16 at 16:45
  • Hehe, I mentioned earlier I copied most of the code from MSDN example on the topic. The only part I wrote is the one that is not actually working. Thanks for the help! I'll be away for few days so it will take some time to verify it and mark it as answer (except if my stupidity prevents me from correctly implementing it). Cheers and have a great week! – Arvigeus Mar 02 '16 at 19:24
  • Something's not right. For starters, Task.WhenAll does not accept CancellationTokesn as second parameter. And I can't see ContinueWith. Sorry for being so sassy... – Arvigeus Mar 04 '16 at 10:46
  • OK, I'm not 100% familiar with that area of the TPL. This looks good: http://stackoverflow.com/a/27240225/122718 – usr Mar 04 '16 at 12:38
  • I see your point. Unfortunately this way the task is not removed from active tasks and the TaskScheduler is blocked for new tasks. I think I found a solution using inline task (not sure how correct is to use this): `Instance.StartNew(() => { cts.CancelAfter(timeoutInMilliseconds); Task.Factory.StartNew(() => action(cts.Token), cts.Token, TaskCreationOptions.AttachedToParent|TaskCreationOptions.LongRunning, TaskScheduler.Default); }, cts.Token);` – Arvigeus Mar 04 '16 at 16:30
  • That should work. I think attached tasks are quite an awkward pattern so I would always try to construct task dependencies explicitly. Another idea: You don't need a TaskScheduler at all I think. You can make tasks enter a `SemaphoreSlim`. I find that this approach of scheduling tasks is often easier and it works with async tasks which are not purely CPU based. – usr Mar 04 '16 at 16:40
  • `SemaphoreSlim` looks like good alternative, but not suitable for my case because it doesn't guarantee order of execution and eventually could lead to task starvation (in my case this is highly plausible). REALLY THANK YOU for your time and effort! Unfortunately I'll have to answer my own question (besides, if someone else stumbles later to this topic to see complete solution) – Arvigeus Mar 05 '16 at 13:05
  • Alright, I'll take a look and maybe give feedback to that answer. – usr Mar 05 '16 at 13:17
1

Not the perfect solution, but the best that I could think of:

public CancellationTokenSource Add(Action<CancellationToken> action, Action<Task> callback, int timeoutInMilliseconds)
{
    var cts = new CancellationTokenSource();
    Instance.StartNew(() =>
    {
        cts.CancelAfter(timeoutInMilliseconds);
        var task = Task.Factory.StartNew(() => action(cts.Token), cts.Token, TaskCreationOptions.AttachedToParent|TaskCreationOptions.LongRunning, TaskScheduler.Default);
        try
        {
            task.Wait(timeoutInMilliseconds, cts.Token);
        }
        catch (OperationCanceledException) { }
        callback(task);
    }, cts.Token);
    return cts;
}

In short: When task is started from the queue it creates child task on the default task scheduler (because the other one will just enqueue it), then we wait for certain amount of time and call callback function. Not sure how bad this is.

Arvigeus
  • 353
  • 3
  • 17