1

There is a need to periodically execute some operations. Major requirements are
1) don't start next update cycle while previous didn't finish
2) don't start update if data obtained in previous iteration is still valid, i.e. time since last refresh is smaller than TTL value
3) there are separate (say >10) threads required to do such updates.
There are a lot of questions of same kind on SO, so I've found this implementation of PeriodicTaskFactory here by @Jim.
It is working as expected, but when it came to spawning multiple such factories concurrently, I'm starting to experience some overhead during refreshes which deform whole process (cancels few iterations that were about to happen). Here's code:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace CA_TasksTests
{
    class Program
    {
        // Result
        public class Result
        {
            public string Message { get; set; }
            public Result(int iter, string info)
            {
                Message = "#" + iter + info;
            }
            public override string ToString()
            {
                return Message;
            }
        }

        // Operation parameters
        public class Operation
        {
            public string OperationName { get; set; }
            public TimeSpan TTL { get { return TimeSpan.FromMilliseconds(Interval); } }
            public DateTime LastUpdate { get; set; }
            public Operation(int id)
            {
                OperationName = "Operation" + ((id < 10) ? "0" : "") + id;
            }

        }

        public static int Interval = 2000;
        public static int Duration = 10000;
        public static int OperationsCount = 10;
        static void Main()
        {
            // Creating 10 test operations
            var operations = Enumerable.Range(1, OperationsCount).Select(i => new Operation(i)).ToList();
            // Executing them
            var r = ExecuteActions(operations).OrderBy(i => i.Message).ToList();
            Console.WriteLine("Results (expected=" + (Duration/Interval*OperationsCount) + ") : " + r.Count);
            Console.ReadLine();

        }

        // Operation execution
        public static Result ExecuteOperation(int iter, Operation operation)
        {
            // Assiging last update timestamp
            operation.LastUpdate = DateTime.Now;
            var t = Task.Factory.StartNew(() =>
                {
                    // Some operation
                    Thread.Sleep(1000);
                    return new Result(iter, operation.OperationName);
                });
            var r = t.Result;
            return r;
        }

        public static List<Result> ExecuteActions(List<Operation> operations)
        {
            var list = new List<Result>();
            var tasks = new ConcurrentBag<Task>();
            foreach (var currentOperation in operations)
            {
                var iter = 0;
                var locker = new object();
                Operation operation = currentOperation;
                var perdiodicTask = PeriodicTaskFactory.Start(() =>
                                {
                                    // (*) Looking if we need updates semantically - 
                                    // through comparing time since last refresh with operation TTL
                                    Console.WriteLine(DateTime.Now + " : " + (DateTime.Now - operation.LastUpdate) + " ?> " + operation.TTL);
                                    // Looking if we need updates logically -
                                    // if previous operation is still running
                                    if (!Monitor.TryEnter(locker))
                                    {
                                        Console.WriteLine(">>>" + DateTime.Now + " Cancelled");
                                        return;
                                    }
                                    try
                                    {
                                        // Semantic update
                                        if (DateTime.Now - operation.LastUpdate > operation.TTL)
                                        {
                                            iter++;
                                            Console.WriteLine(DateTime.Now + " Refresh #" + iter + " " + operation.OperationName);
                                            list.Add(ExecuteOperation(iter, operation));
                                        }
                                    }
                                    finally
                                    {
                                        Monitor.Exit(locker);
                                    }
                                }, intervalInMilliseconds: (int)operation.TTL.TotalMilliseconds, duration: Duration /*maxIterations:2*/);


                var end = perdiodicTask.ContinueWith(_ =>
                    {
                        _.Dispose();
                        Console.WriteLine(">>>" + DateTime.Now + " " + operation.OperationName + " finished");
                    });
                tasks.Add(end);
            }
            Task.WaitAll(tasks.ToArray());
            return list;
        }
    }

    /// <summary>
    /// Factory class to create a periodic Task to simulate a <see cref="System.Threading.Timer"/> using <see cref="Task">Tasks.</see>
    /// </summary>
    public static class PeriodicTaskFactory
    {
        /// <summary>
        /// Starts the periodic task.
        /// </summary>
        /// <param name="action">The action.</param>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds, i.e. how long it waits to kick off the timer.</param>
        /// <param name="duration">The duration.
        /// <example>If the duration is set to 10 seconds, the maximum time this task is allowed to run is 10 seconds.</example></param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create the task for executing the <see cref="Action"/>.</param>
        /// <returns>A <see cref="Task"/></returns>
        /// <remarks>
        /// Exceptions that occur in the <paramref name="action"/> need to be handled in the action itself. These exceptions will not be 
        /// bubbled up to the periodic task.
        /// </remarks>
        public static Task Start(Action action,
                                 int intervalInMilliseconds = Timeout.Infinite,
                                 int delayInMilliseconds = 0,
                                 int duration = Timeout.Infinite,
                                 int maxIterations = -1,
                                 bool synchronous = false,
                                 CancellationToken cancelToken = new CancellationToken(),
                                 TaskCreationOptions periodicTaskCreationOptions = TaskCreationOptions.None)
        {
            //Console.WriteLine(DateTime.Now + " PeriodicTaskFactory.Start");
            Stopwatch stopWatch = new Stopwatch();
            Action wrapperAction = () =>
            {
                CheckIfCancelled(cancelToken);
                action();
            };

            Action mainAction = () =>
            {
                MainPeriodicTaskAction(intervalInMilliseconds, delayInMilliseconds, duration, maxIterations, cancelToken, stopWatch, synchronous, wrapperAction, periodicTaskCreationOptions);
            };

            return Task.Factory.StartNew(mainAction, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        }

        /// <summary>
        /// Mains the periodic task action.
        /// </summary>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds.</param>
        /// <param name="duration">The duration.</param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="stopWatch">The stop watch.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="wrapperAction">The wrapper action.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create a sub task for executing the <see cref="Action"/>.</param>
        private static void MainPeriodicTaskAction(int intervalInMilliseconds,
                                                   int delayInMilliseconds,
                                                   int duration,
                                                   int maxIterations,
                                                   CancellationToken cancelToken,
                                                   Stopwatch stopWatch,
                                                   bool synchronous,
                                                   Action wrapperAction,
                                                   TaskCreationOptions periodicTaskCreationOptions)
        {
            var iters = duration / intervalInMilliseconds;
            if (iters > 0)
            {
                maxIterations = iters;
            }
            TaskCreationOptions subTaskCreationOptions = TaskCreationOptions.AttachedToParent | periodicTaskCreationOptions;

            CheckIfCancelled(cancelToken);

            if (delayInMilliseconds > 0)
            {
                Thread.Sleep(delayInMilliseconds);
            }

            if (maxIterations == 0) { return; }

            int iteration = 0;

            ////////////////////////////////////////////////////////////////////////////
            // using a ManualResetEventSlim as it is more efficient in small intervals.
            // In the case where longer intervals are used, it will automatically use 
            // a standard WaitHandle....
            // see http://msdn.microsoft.com/en-us/library/vstudio/5hbefs30(v=vs.100).aspx
            using (ManualResetEventSlim periodResetEvent = new ManualResetEventSlim(false))
            {
                ////////////////////////////////////////////////////////////
                // Main periodic logic. Basically loop through this block
                // executing the action
                while (true)
                {
                    CheckIfCancelled(cancelToken);

                    Task subTask = Task.Factory.StartNew(wrapperAction, cancelToken, subTaskCreationOptions, TaskScheduler.Current);

                    if (synchronous)
                    {
                        stopWatch.Start();
                        try
                        {
                            subTask.Wait(cancelToken);
                        }
                        catch { /* do not let an errant subtask to kill the periodic task...*/ }
                        stopWatch.Stop();
                    }

                    // use the same Timeout setting as the System.Threading.Timer, infinite timeout will execute only one iteration.
                    if (intervalInMilliseconds == Timeout.Infinite) { break; }

                    iteration++;

                    if (maxIterations > 0 && iteration >= maxIterations) { break; }

                    try
                    {
                        stopWatch.Start();
                        periodResetEvent.Wait(intervalInMilliseconds, cancelToken);
                        stopWatch.Stop();
                    }
                    finally
                    {
                        periodResetEvent.Reset();
                    }

                    CheckIfCancelled(cancelToken);

                    if (duration > 0 && stopWatch.ElapsedMilliseconds >= duration) { break; }
                }
            }
        }

        /// <summary>
        /// Checks if cancelled.
        /// </summary>
        /// <param name="cancelToken">The cancel token.</param>
        private static void CheckIfCancelled(CancellationToken cancellationToken)
        {
            if (cancellationToken == null)
                throw new ArgumentNullException("cancellationToken");

            cancellationToken.ThrowIfCancellationRequested();
        }
    }
}

Output of TTL comparison check (*) shows :

9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.0020000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:00.9940000 >? 00:00:02

So I have few of updates cancelled because of this overhead. What might be causing this and how it can be fixed? My first guess is thread switching expenses which leads to setting some Epsilon in that comparison and live with that. Thanks for help.

Community
  • 1
  • 1
Jaded
  • 1,802
  • 6
  • 25
  • 38

1 Answers1

3

That's a pretty complicated way to do things. I'd suggest a different route using a System.Threading.Timer. There's no need for a lock, and operations can run concurrently. Also, you can have different timings for each update.

To prevent reentrant updates (i.e. FooUpdate triggering again while the previous FooUpdate is running), you create a one-shot timer that you reinitialize after each update. So your timer looks like this:

System.Threading.Timer FooUpdateTimer = new System.Threading.Timer(
    FooUpdate, null, TimeSpan.FromSeconds(2), TimeSpan.Infinite);

And your FooUpdate looks like this:

DateTime LastFooUpdate = DateTime.MinValue;
void FooUpdate(object state)
{
    // check data freshness
    if ((DateTime.UtcNow - LastFooUpdate) > SomeMinimumTime)
    {
        // do update
        // and reset last update time
        LastFooUpdate = DateTime.UtcNow;
    }
    // then, reset the timer
    FooUpdateTimer.Change(TimeSpan.FromSeconds(2), TimeSpan.Infinite);
}

If you want a BarUpdate that runs every 10 seconds, you duplicate the code above with a 10 second update time. That is:

System.Threading.Timer BarUpdateTimer = new System.Threading.Timer(
    BarUpdate, null, TimeSpan.FromSeconds(10), TimeSpan.Infinite);

DateTime LastBarUpdate = DateTime.MinValue;
void BarUpdate(object state)
{
    ...
}

That's fine if you only have one or two of those. If you expect to have a bunch of them, then wrap that functionality into a class. Let's see . . .

class PeriodicUpdater
{
    private System.Threading.Timer _timer;
    private TimeSpan _interval;
    private DateTime _lastUpdateTime = DateTime.MinValue;
    private Action _updateAction;
    private TimeSpan _freshness;

    public PeriodicUpdater(Action updateAction, TimeSpan interval, TimeSpan freshness)
    {
        _interval = interval;
        _updateAction = updateAction;
        _freshness = freshness;
        _timer = new Timer(TimerTick, null, _interval, TimeSpan.Infinite);
    }

    private void TimerTick(object state)
    {
        if ((DateTime.UtcNow - LastUpdateTime) >= _freshness)
        {
            _updateAction();
            _lastUpdateTime = DateTime.UtcNow;
        }
        _timer.Change(_interval, TimeSpan.Infinite);
    }
}

And to create one:

var FooUpdater = new PeriodicUpdater(
    FooUpdateAction, 
    TimeSpan.FromSeconds(2.0),
    TimeSpan.FromSeconds(8.0));

var BarUpdater = new PeriodicUpdater(
    BarUpdateAction,
    TimeSpan.FromSeconds(10.0),
    TimeSpan.FromSeconds(15.5));

private void FooUpdateAction()
{
    // do the Foo update
}

private void BarUpdateAction()
{
    // do the Bar update
}

That should give you the basic idea.

Update: Cancellation

If you want to add support for cancellation, you pass a CancellationToken to the constructor, and register a callback. So the constructor becomes:

    public PeriodicUpdater(Action updateAction, TimeSpan interval, 
        TimeSpan freshness, CancellationToken ct)
    {
        _interval = interval;
        _updateAction = updateAction;
        _freshness = freshness;
        _timer = new Timer(TimerTick, null, _interval, TimeSpan.Infinite);
        ct.Register(Cancel);
    }

And you add the Cancel method:

    private void Cancel()
    {
        _timer.Change(0, 0);
        _timer.Dispose();
    }
Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
  • Indeed it looks simpler, but what I have while using tasks is simple token-ish cancellation support (eventually this looping has to end somehow, i.e. upon request) and more clear way of returning results. Also locking inside my piece doesn't look very expensive. I guess problem is somewhere else. Anyway thanks for your effort, good illustration to KISS principle. – Jaded Sep 23 '13 at 19:21
  • @Jaded: `CancellationToken` support is easy enough to add. See my update. – Jim Mischel Sep 23 '13 at 19:44