3

I am developing a proof of concept app that factors a list of numbers using tasks and a semaphore, currently I have a List of Tasks,List<Task>, that take a FactorNumberClass and then calculate the factors of the specific number within the FactorNumberClass this currently works correctly. With each Task T, I have a ContinueWith task that updates the progress of the total numbers factored, the average time for factoring, and updates a progress bar with the value of (Numbers successfully factored)/(Total numbers to be factored). When factoring these Tasks enter a SemaphoreSlim.Wait(cancelToken) that limits the current factoring to 5 active Tasks. Lastly I have a ContinueWhenAll that logs when all tasks have completed. Assuming no cancellation, this all works as I intend.

The problem arises when I attempt to cancel the Tasks, I can not detect whether or not the task has been cancelled and therefore can not make an accurate determination on whether the number has been successfully factored or if it was cancelled. How can I detect whether or not the parent task was cancelled or ran to completion?

Cancellation Token Definition:

public static CancellationTokenSource tokenSource = new CancellationTokenSource();
public static CancellationToken ct = tokenSource.Token;

Factor Class Code:

public class FactorNumberClass
{
    public FactorNumberClass()
    {
    }

    public FactorNumberClass(int num, int threadnum)
    {
        this.number = num;
        this.threadNumber = threadnum;
    }

    public List<int> factors = new List<int>();
    public int number;
    public int max;
    public int threadNumber;
}

Factoring Method:

public void Factor(FactorNumberClass F, CancellationToken token)
        {
            LogtoStatusText("Thread: " + F.threadNumber + " Trying to enter semaphore");

            try
            {
                ASemaphore.Wait(ct);

                F.max = (int)Math.Sqrt(F.number);  //round down

                for (int factor = 1; factor <= F.max; ++factor)
                { //test from 1 to the square root, or the int below it, inclusive.
                    if (F.number % factor == 0)
                    {
                        F.factors.Add(factor);
                        if (factor != F.number / factor)
                        {
                            F.factors.Add(F.number / factor);
                        }
                    }
                }

                F.factors.Sort();
                Thread.Sleep(F.number * 300);
                LogtoStatusText("Task: " + F.threadNumber + " Completed - Factors: " + string.Join(",", F.factors.ToArray()));
                LogtoStatusText("Thread: " + F.threadNumber + " Releases semaphore with previous count: " + ASemaphore.Release());
            }
            catch (OperationCanceledException ex)
            {
                LogtoStatusText("Thread: " + F.threadNumber + " Cancelled.");
            }
            finally
            {
            }
        }

Method that starts the processing:

public void btnStart_Click(object sender, RoutedEventArgs e)
        {
            Task T;
            List<Task> TaskList = new List<Task>();
            LogtoStatusText("**** Begin creating tasks *****");
            s1.Start();

            AProject.FactorClassList.ForEach((f) =>
            {
                T = new Task(((x) => { OnUIThread(() => { RunningTasks++; }); Factor(f, ct); }), ct);

                T.ContinueWith((y) =>
                {
                    if (y.IsCompleted)
                    {
                        AProject.TotalProcessedAccounts++;
                        AProject.AverageProcessTime = (((Double)AProject.TotalProcessedAccounts / s1.ElapsedMilliseconds) * 1000);
                    }
                    OnUIThread(() => { RunningTasks--; });
                    OnUIThread(() => { UpdateCounts(AProject); });
                });

                TaskList.Add(T);
            });

            try
            {
                Task.Factory.ContinueWhenAll(TaskList.ToArray(), (z) => { LogtoStatusText("**** Completed all Tasks *****"); OnUIThread(() => { UpdateCounts(AProject); }); });
            }
            catch (AggregateException a)
            {
                // For demonstration purposes, show the OCE message.
                foreach (var v in a.InnerExceptions)
                    LogtoStatusText("msg: " + v.Message);
            }

            LogtoStatusText("**** All tasks have been initialized, begin processing *****");
            TaskList.ForEach(t => t.Start());
        }
Jochen van Wylick
  • 5,303
  • 4
  • 42
  • 64
Pseudonym
  • 2,052
  • 2
  • 17
  • 38

3 Answers3

2

Release the semaphore in the finally block so that it is always properly released. No need to detect cancellation.

Also, side-effects buried in log messages are not good style:

LogtoStatusText("..." + ASemaphore.Release());

I only found this through text search. Would have never noticed the mistake otherwise.

usr
  • 168,620
  • 35
  • 240
  • 369
  • Good catch, the LogtoStatusText is just an output for viewing what the heck is going on with the app, but you are entirely correct – Pseudonym May 07 '15 at 12:59
  • Also if the cancellation has been requested I do not want the existing waiting tasks to enter the semaphore, which they will if I do not use Wait(cancellationToken) – Pseudonym May 07 '15 at 13:01
  • I realize that this answer might not actually help you resolve the problem. Consider letting the cancellation exception bubble up out of `Factor` using `throw;`. That way cancellation is propagated. Also, consider using await in a few places to simplify logic. Some things are heard to read right now. await is especially good as transitioning back to the UI thread automatically. – usr May 07 '15 at 13:04
  • In addition to that using the Finally to release the semaphore will release it every single time even after the cancellation token (ct) throws a cancellation event, this will eventually increment the semaphore count to over the maximum – Pseudonym May 07 '15 at 13:05
  • Can't use await I am stuck in VS2010 and .NET 4.0 – Pseudonym May 07 '15 at 13:06
1

Using a cancellation token:

using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
    static void Main()
    {
        var tokenSource2 = new CancellationTokenSource();
        CancellationToken ct = tokenSource2.Token;

        var task = Task.Factory.StartNew(() =>
        {

            // Were we already canceled?
            ct.ThrowIfCancellationRequested();

            bool moreToDo = true;
            while (moreToDo)
            {
                // Poll on this property if you have to do 
                // other cleanup before throwing. 
                if (ct.IsCancellationRequested)
                {
                    // Clean up here, then...
                    ct.ThrowIfCancellationRequested();
                }

            }
        }, tokenSource2.Token); // Pass same token to StartNew.

        tokenSource2.Cancel();

        // Just continue on this thread, or Wait/WaitAll with try-catch: 
        try
        {
            task.Wait();
        }
        catch (AggregateException e)
        {
            foreach (var v in e.InnerExceptions)
                Console.WriteLine(e.Message + " " + v.Message);
        }
        finally
        {
            tokenSource2.Dispose();
        }

        Console.ReadKey();
    }
}

https://msdn.microsoft.com/en-us/library/dd997396%28v=vs.110%29.aspx

Oscar
  • 13,594
  • 8
  • 47
  • 75
  • How does this tie in with a semaphore and a ContinueWith? I have seen this article before but I don't think it addresses what I am looking for – Pseudonym May 07 '15 at 13:03
0

I finally found the solution I was looking for that would allow me to launch (Start()) all of my Task objects, run them through a semaphoreslim, observe a CancellationToken, and then detect if the Task was cancelled or had completed normally. In this case a Task would only "complete normally" if it had entered the semaphore and begun processing before the CancellationTokenSource.Cancel() was fired.

This answer: Elegantly handle task cancellation pushed me in the correct direction. I ended up catching the OperationCancelledException, logging it, and then re-throwing it, to be examined within the ContinueWith Task

Here is the updated code which solved my issue

Factor Class:

private void Factor(FactorNumberClass F)
        {


            LogtoStatusText("Thread: " + F.threadNumber + " Trying to enter semaphore");

            try
            {
                ASemaphore.Wait(ct);

                F.max = (int)Math.Sqrt(F.number);  //round down

                for (int factor = 1; factor <= F.max; ++factor)
                { //test from 1 to the square root, or the int below it, inclusive.
                    if (F.number % factor == 0)
                    {
                        F.factors.Add(factor);
                        if (factor != F.number / factor)
                        {
                            F.factors.Add(F.number / factor);
                        }
                    }
                }

                F.factors.Sort();

                Thread.Sleep(F.number * 300);

                LogtoStatusText("Task: " + F.threadNumber + " Completed - Factors: " + string.Join(",", F.factors.ToArray()));

                LogtoStatusText("Thread: " + F.threadNumber + " Releases semaphore with previous count: " + ASemaphore.Release());
            }
            catch
            {
                LogtoStatusText("Thread: " + F.threadNumber + " Cancelled");
                throw;

            }
            finally
            {

            }

        }

Processing Methods:

public void btnStart_Click(object sender, RoutedEventArgs e)
{
    LaunchTasks();
}

private void LaunchTasks()
        {
            Task T;
            List<Task> TaskList = new List<Task>();

            LogtoStatusText("**** Begin creating tasks *****");

            s1.Start();

            AProject.FactorClassList.ForEach((f) =>
            {
                T = new Task(((x) => { OnUIThread(() => { RunningTasks++; }); Factor(f); }), ct);

                T.ContinueWith((y) =>
                {
                    if (y.Exception != null)
                    {
                        // LogtoStatusText(y.Status + " with "+y.Exception.InnerExceptions[0].GetType()+": "+ y.Exception.InnerExceptions[0].Message);
                    }
                    if (!y.IsFaulted)
                    {

                        AProject.TotalProcessedAccounts++;
                        AProject.AverageProcessTime = (((Double)AProject.TotalProcessedAccounts / s1.ElapsedMilliseconds) * 1000);
                    }
                    OnUIThread(() => { RunningTasks--; });
                    OnUIThread(() => { UpdateCounts(AProject); });


                });

                TaskList.Add(T);
            });

            try
            {
                Task.Factory.ContinueWhenAll(TaskList.ToArray(), (z) => { LogtoStatusText("**** Completed all Tasks *****"); OnUIThread(() => { UpdateCounts(AProject); }); });
            }
            catch (AggregateException a)
            {
                // For demonstration purposes, show the OCE message.
                foreach (var v in a.InnerExceptions)
                    LogtoStatusText("msg: " + v.Message);
            }

            LogtoStatusText("**** All tasks have been initialized, begin processing *****");

            TaskList.ForEach(t => t.Start());
        }
Pseudonym
  • 2,052
  • 2
  • 17
  • 38