15

I'd like to run a Task that has a "heartbeat" that keeps running at a specific time interval until the task completes.

I'm thinking an extension method like this would work well:

public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken)

For example:

public class Program {
    public static void Main() {
        var cancelTokenSource = new CancellationTokenSource();
        var cancelToken = cancelTokenSource.Token;
        var longRunningTask = Task.Factory.StartNew(SomeLongRunningTask, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        var withHeartbeatTask = longRunningTask.WithHeartbeat(TimeSpan.FromSeconds(1), PerformHeartbeat, cancelToken);
        withHeartbeatTask.Wait();
        Console.WriteLine("Long running task completed!");
        Console.ReadLine()
    }

    private static void SomeLongRunningTask() {
        Console.WriteLine("Starting long task");
        Thread.Sleep(TimeSpan.FromSeconds(9.5));
    }
    private static int _heartbeatCount = 0;
    private static void PerformHeartbeat(CancellationToken cancellationToken) {
        Console.WriteLine("Heartbeat {0}", ++_heartbeatCount);
    }
}

This program should output:

Starting long task
Heartbeat 1
Heartbeat 2
Heartbeat 3
Heartbeat 4
Heartbeat 5
Heartbeat 6
Heartbeat 7
Heartbeat 8
Heartbeat 9
Long running task completed!

Note that it should not (under normal circumstances) output "Heartbeat 10" since the heartbeat starts after the initial timeout (i.e. 1 second). Similarly, if the task takes less time than the heartbeat interval, the heartbeat should not occur at all.

What is a good way to implement this?

Background information: I have a service that's listening to an Azure Service Bus queue. I'd like to not Complete the message (which would permanently remove it from the queue) until I finish processing it, which could take longer than the maximum message LockDuration of 5 minutes. Thus, I need to use this heartbeat approach to call RenewLockAsync before the lock duration expires so that the message doesn't timeout while lengthy processing is occurring.

Jeff Moser
  • 19,727
  • 6
  • 65
  • 85
  • 1
    This sounds similar to reporting progress in an async task (just that the thing triggering a report is a time interval, and there's no real progress to report, except maybe the heartbeat count). Do either of these links help? http://blogs.msdn.com/b/dotnet/archive/2012/06/06/async-in-4-5-enabling-progress-and-cancellation-in-async-apis.aspx http://stackoverflow.com/questions/15408148/c-sharp-async-await-progress-event-on-task-object – Tim S. Jun 14 '13 at 18:57
  • @TimS. They're similar but not quite what I want, especially with the case of never reporting if the task completes quickly. Also, the heartbeat doesn't know the progress per say. However, I'd be happy to see if you can implement the progress approach to match my extension API and have the same net effect with simpler code. – Jeff Moser Jun 14 '13 at 19:05

2 Answers2

15

Here's my attempt:

public static class TaskExtensions {
    /// <summary>
    /// Issues the <paramref name="heartbeatAction"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
    /// </summary>
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) {
            return;
        }

        var stopHeartbeatSource = new CancellationTokenSource();
        cancellationToken.Register(stopHeartbeatSource.Cancel);

        await Task.WhenAny(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatAction, stopHeartbeatSource.Token));
        stopHeartbeatSource.Cancel();
    }
        
    private static async Task PerformHeartbeats(TimeSpan interval, Action<CancellationToken> heartbeatAction, CancellationToken cancellationToken) {
        while (!cancellationToken.IsCancellationRequested) {
            try {
                await Task.Delay(interval, cancellationToken);
                if (!cancellationToken.IsCancellationRequested) {
                    heartbeatAction(cancellationToken);
                }
            }
            catch (TaskCanceledException tce) {
                if (tce.CancellationToken == cancellationToken) {
                    // Totally expected
                    break;
                }
                throw;
            }
        }
    }
}

or with a slight tweak, you can even make the heartbeat async as in:

    /// <summary>
    /// Awaits a fresh Task created by the <paramref name="heartbeatTaskFactory"/> once every <paramref name="heartbeatInterval"/> while <paramref name="primaryTask"/> is running.
    /// </summary>
    public static async Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
        if (cancellationToken.IsCancellationRequested) {
            return;
        }

        var stopHeartbeatSource = new CancellationTokenSource();
        cancellationToken.Register(stopHeartbeatSource.Cancel);

        await Task.WhenAll(primaryTask, PerformHeartbeats(heartbeatInterval, heartbeatTaskFactory, stopHeartbeatSource.Token));

        if (!stopHeartbeatSource.IsCancellationRequested) {
            stopHeartbeatSource.Cancel();
        }
    }

    public static Task WithHeartbeat(this Task primaryTask, TimeSpan heartbeatInterval, Func<CancellationToken, Task> heartbeatTaskFactory) {
        return WithHeartbeat(primaryTask, heartbeatInterval, heartbeatTaskFactory, CancellationToken.None);
    }

    private static async Task PerformHeartbeats(TimeSpan interval, Func<CancellationToken, Task> heartbeatTaskFactory, CancellationToken cancellationToken) {
        while (!cancellationToken.IsCancellationRequested) {
            try {
                await Task.Delay(interval, cancellationToken);
                if (!cancellationToken.IsCancellationRequested) {
                    await heartbeatTaskFactory(cancellationToken);
                }
            }
            catch (TaskCanceledException tce) {
                if (tce.CancellationToken == cancellationToken) {
                    // Totally expected
                    break;
                }
                throw;
            }
        }
    }

which would allow you to change the sample code to something like this:

private static async Task PerformHeartbeat(CancellationToken cancellationToken) {
    Console.WriteLine("Starting heartbeat {0}", ++_heartbeatCount);
    await Task.Delay(1000, cancellationToken);
    Console.WriteLine("Finishing heartbeat {0}", _heartbeatCount);
}

The PerformHeartbeat could be replaced with an async call like RenewLockAsync so that you wouldn't have to waste thread time using a blocking call like RenewLock that the Action approach would require.

I'm answering my own question per SO guidelines, but I'm also open to more elegant approaches to this problem.

Community
  • 1
  • 1
Jeff Moser
  • 19,727
  • 6
  • 65
  • 85
  • Hi, I got to this post from your comment on my question of similar interest.In the case of SB queue when and where exactly are you renewing the Lock? the worker role i have is just a single thread as such and the message receiving and processing is run in a while loop in the Run() method. – Aravind Jul 15 '13 at 18:47
  • @Aravind When I receive the SB message, I create a Task to process it. The task uses this heartbeat helper to heartbeat as long as it is running. – Jeff Moser Jul 16 '13 at 13:04
  • Oh ok. Since this message processing I use may not be that frequently used I did not create tasks for processing every message. I did not find the use of RenewLock in the sample code that's why asked so. – Aravind Jul 17 '13 at 02:56
  • This is an old one - but was the `Task.WhenAny` supposed to be changed into `Task.WhenAll` in the "async" version? I'm working on a similar problem and seem to be running into the heartbeat never stopping after the primary task finishes. But changing back to `WhenAny` looks like it might work correctly for me now? – Peter Tirrell Mar 24 '20 at 15:51
0

Here's my approach

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication3
{
class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Start Main");
        StartTest().Wait();
        Console.ReadLine();
        Console.WriteLine("Complete Main");
    }

    static async Task StartTest()
    {
        var cts = new CancellationTokenSource();

        // ***Use ToArray to execute the query and start the download tasks. 
        Task<bool>[] tasks = new Task<bool>[2];
        tasks[0] = LongRunningTask("", 20, cts.Token);
        tasks[1] = Heartbeat("", 1, cts.Token);

        // ***Call WhenAny and then await the result. The task that finishes 
        // first is assigned to firstFinishedTask.
        Task<bool> firstFinishedTask = await Task.WhenAny(tasks);

        Console.WriteLine("first task Finished.");
        // ***Cancel the rest of the downloads. You just want the first one.
        cts.Cancel();

        // ***Await the first completed task and display the results. 
        // Run the program several times to demonstrate that different
        // websites can finish first.
        var isCompleted = await firstFinishedTask;
        Console.WriteLine("isCompleted:  {0}", isCompleted);
    }

    private static async Task<bool> LongRunningTask(string id, int sleep, CancellationToken ct)
    {
        Console.WriteLine("Starting long task");


        await Task.Delay(TimeSpan.FromSeconds(sleep));

        Console.WriteLine("Completed long task");
        return true;
    }

    private static async Task<bool> Heartbeat(string id, int sleep, CancellationToken ct)
    {
        while(!ct.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(sleep));
            Console.WriteLine("Heartbeat Task Sleep: {0} Second", sleep);
        }

        return true;
    }

}

}

szmulder
  • 297
  • 1
  • 3