2

I am creating a .NET core worker service that is intended to run 10 different long running functions which parse various CSV files and spreadsheets.

I need each function to run at various times daily based on the specific function. If the very same function is currently running then it must 'await' that same function to finish before a new run.

I am new to async/await. Could anyone suggest an approach that would allow all 10 functions to run in parallel but never the same function at once?

Thanks in advance!

EDIT:

  • These parsers take anywhere from 5 minutes to 5 hrs to run.

  • Each function has its own unique needs of when exactly when to run daily or even hourly.

  • If a current function is running and the same function is up to run again, the next function should be removed until the next scheduled time and repeat if needed

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
chrisg229
  • 909
  • 2
  • 7
  • 21
  • 1
    It sounds like you need a *scheduler* to coordinate all of this. [Quartz](https://www.nuget.org/packages/Quartz) can be added to a worker service with relative ease and your various parsing tasks become jobs that run on a schedule (you can configure the trigger to wait until the job is done before running it again). – madreflection Oct 04 '22 at 17:57
  • 1
    You're not going to get that kind of control by directly dealing with `Task`s unless you write *a lot* of that coordination code yourself. – madreflection Oct 04 '22 at 17:58
  • 1
    I need more information to answer this properly. You can run 10 tasks (or more) in parallel. It sounds like you need 10 different loops running in parallel, while the stuff that happens in the loop is sequential. That should also be doable. How long is long running though? Have you considered something like Azure Functions? – Jakob Busk Sørensen Oct 04 '22 at 18:00
  • 1
    You might find these two questions useful: [Run async method regularly with specified interval](https://stackoverflow.com/questions/30462079/run-async-method-regularly-with-specified-interval), and [.NET 6 PeriodicTimer with top-of-the-minute timing](https://stackoverflow.com/questions/72739162/net-6-periodictimer-with-top-of-the-minute-timing). – Theodor Zoulias Oct 04 '22 at 19:34
  • 1
    Quartz has job scheduling – Daniel A. White Oct 05 '22 at 13:23

4 Answers4

1

How about using a WaitHandle?

// set intial state to signaled so the first worker can enter
AutoResetEvent done = new AutoResetEvent(true);

public async Task DoWork(){

   // Wait at most 1 ms to acquire the handle
   if(!done.WaitOne(1)) return;

   // do some stuff here

   // Release handle to other threads
   done.Set();
}

This guarantees only one thread will be doing the work at a time. For more information on AutoResetEvent

Sardelka
  • 354
  • 3
  • 9
1

Here is a CronosTimer class similar in shape with the System.Timers.Timer class, that fires the Elapsed event on dates and times specified with a Cron expression. The event is fired in a non-overlapping manner. The CronosTimer has a dependency on the Cronos library by Sergey Odinokov. This library is a TimeSpan calculator, not a scheduler. Caveat: in its current version (0.7.1), the Cronos library is capped to the year 2099.

using Cronos;

/// <summary>
/// Generates non-overlapping events according to a Cron expression.
/// </summary>
public class CronosTimer : IAsyncDisposable
{
    private readonly System.Threading.Timer _timer; // Used also as the locker.
    private readonly CronExpression _cronExpression;
    private readonly CancellationTokenSource _cts;
    private Func<CancellationToken, Task> _handler;
    private Task _activeTask;
    private bool _disposed;
    private static readonly TimeSpan _minDelay = TimeSpan.FromMilliseconds(500);

    public CronosTimer(string expression, CronFormat format = CronFormat.Standard)
    {
        _cronExpression = CronExpression.Parse(expression, format);
        _cts = new();
        _timer = new(async _ =>
        {
            Task task;
            lock (_timer)
            {
                if (_disposed) return;
                if (_activeTask is not null) return;
                if (_handler is null) return;
                Func<CancellationToken, Task> handler = _handler;
                CancellationToken token = _cts.Token;
                _activeTask = task = Task.Run(() => handler(token));
            }
            try { await task.ConfigureAwait(false); }
            catch (OperationCanceledException) when (_cts.IsCancellationRequested) { }
            finally
            {
                lock (_timer)
                {
                    Debug.Assert(ReferenceEquals(_activeTask, task));
                    _activeTask = null;
                    if (!_disposed && _handler is not null) ScheduleTimer();
                }
            }
        });
    }

    private void ScheduleTimer()
    {
        Debug.Assert(Monitor.IsEntered(_timer));
        Debug.Assert(!_disposed);
        Debug.Assert(_handler is not null);
        DateTime utcNow = DateTime.UtcNow;
        DateTime? utcNext = _cronExpression.GetNextOccurrence(utcNow + _minDelay);
        if (utcNext is null)
            throw new InvalidOperationException("Unreachable date.");
        TimeSpan delay = utcNext.Value - utcNow;
        Debug.Assert(delay > _minDelay);
        _timer.Change(delay, Timeout.InfiniteTimeSpan);
    }

    /// <summary>
    /// Occurs when the next occurrence of the Cron expression has been reached,
    /// provided that the previous asynchronous operation has completed.
    /// The CancellationToken argument is canceled when the timer is disposed.
    /// </summary>
    public event Func<CancellationToken, Task> Elapsed
    {
        add
        {
            if (value is null) return;
            lock (_timer)
            {
                if (_disposed) return;
                if (_handler is not null) throw new InvalidOperationException(
                    "More than one handlers are not supported.");
                _handler = value;
                if (_activeTask is null) ScheduleTimer();
            }
        }
        remove
        {
            if (value is null) return;
            lock (_timer)
            {
                if (_disposed) return;
                if (!ReferenceEquals(_handler, value)) return;
                _handler = null;
                _timer.Change(Timeout.Infinite, Timeout.Infinite);
            }
        }
    }

    /// <summary>
    /// Returns a ValueTask that completes when all work associated with the timer
    /// has ceased.
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        Task task;
        lock (_timer)
        {
            if (_disposed) return;
            _disposed = true;
            _handler = null;
            task = _activeTask;
        }
        await _timer.DisposeAsync().ConfigureAwait(false);
        _cts.Cancel();
        if (task is not null)
            try { await task.ConfigureAwait(false); } catch { }
        _cts.Dispose();
    }
}

Usage example:

CronosTimer timer = new("30 6,14,22 * * MON-FRI");
timer.Elapsed += async _ =>
{
    try
    {
        await LongRunningAsync();
    }
    catch (Exception ex)
    {
        _logger.LogError(ex);
    }
};

In this example the LongRunningAsync function will run at 6:30, 14:30 and 22:30 of every working day of the week.

You can find detailed documentation about the format of the Cron expressions here.

For simplicity, the Elapsed event supports only one handler at a time. Subscribing twice with += without unsubscribing with -= results in an exception.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • This looks great! How would I go about adding LongRunningAsync2() along side LongRunningAsync() within the usage? How would you envision these exist within the Window worker service? – chrisg229 Oct 11 '22 at 15:13
  • 1
    @chrisg229 you can just create a separate `CronosTimer` for each long running function, and invoke this function from the `Elapsed` event handler of the associated `CronosTimer`. You could consider putting these timers in a list, so that you can dispose them when the service stops. Unless you are OK with the service stopping abruptly, while some timers might be in the midst of executing their function. – Theodor Zoulias Oct 11 '22 at 17:28
  • 1
    Thank you so much for taking the time to help me sort this out. I will be studying this for a while!! – chrisg229 Oct 11 '22 at 18:22
0

While I use "await/async" a lot it does not mean I use them always. In such case I would use timers (with single trigger, non-repeating), and at the end of each function (or wrapper for it) I would set timer again. This guarantees that execution will not overlap.

greenoldman
  • 16,895
  • 26
  • 119
  • 185
  • How would you suggest setting them up so they are able to run concurrently? – chrisg229 Oct 05 '22 at 12:57
  • @chrisg229 Straightforward, set them all up they way you would like to get actions. Unless you think about some kind of optimization, so each job would get its own CPU core or something like this, so it would be better to shift some job for a free CPU slot. But this more complex, because you would basically writing scheduler and I am sure they are already written :-). – greenoldman Oct 05 '22 at 15:37
-1

Curious about thoughts on this approach.

Worker class:

 protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            //If(NOT SCHEDULED TIME RETURN) //QUARTZ JOB SCHEDULING HERE???

            _tasksManager.LongProccess1Async(stoppingToken);
             _tasksManager.LongProccess2Async(stoppingToken);
            await Task.Delay(_intDelay, stoppingToken); //1 * 1000 = 1 second
        }
    }

Task Manager Class:

private Task<long>? _taskLongProccess1;
    //private long _taskParseDelimitedFiles;
    public async Task longProccess1Async(CancellationToken stoppingToken)
    {

        var pc = _parserConfigs.Find(p => p.Name == "longProccess1");

        while (!stoppingToken.IsCancellationRequested)
        {
            if (_taskLongProccess1 == null)
                _taskLongProccess1 = Task.Run(() => _delimitedParser.LongProccess1(pc.FilePath, pc.Delimiter, pc.ConnectionString, pc.Schema, pc.BulkSize));

            if (_taskLongProccess1.Status == TaskStatus.Running)
            {
                await Task.Delay(pc.Delay, stoppingToken);
            }
            else if (_taskLongProccess1.Status == TaskStatus.RanToCompletion)
            {

                //ONCE DONE LOG AND NULL TASK
                LoggingFunctions.addToLog(_logger, $"Total execution time for task:LongProccess1 = {_taskLongProccess1}", LoggingHelpers.InformationCode);
                _taskLongProccess1 = null;
            }

        }
    }


    private Task<long>? _taskLongProccess2;
    //private long _taskParseDelimitedFiles;
    public async Task longProccess2Async(CancellationToken stoppingToken)
    {

        var pc = _parserConfigs.Find(p => p.Name == "longProccess2");

        while (!stoppingToken.IsCancellationRequested)
        {
            if (_taskLongProccess2 == null)
                _taskLongProccess2 = Task.Run(() => _delimitedParser.LongProccess2(pc.FilePath, pc.Delimiter, pc.ConnectionString, pc.Schema, pc.BulkSize));

            if (_taskLongProccess2.Status == TaskStatus.Running)
            {
                await Task.Delay(pc.Delay, stoppingToken);
            }
            else if (_taskLongProccess2.Status == TaskStatus.RanToCompletion)
            {

                //ONCE DONE LOG AND NULL TASK
                LoggingFunctions.addToLog(_logger, $"Total execution time for task:LongProccess1 = {_taskLongProccess2}", LoggingHelpers.InformationCode);
                _taskLongProccess2 = null;
            }

        }
    }
chrisg229
  • 909
  • 2
  • 7
  • 21
  • 2
    There are way too many problems with this approach, you know about await/async and now you try to force it for your problem. Manually handling tasks, missing .ConfigureAwait(false), using separate fields, instead of List (so the solution won't scale). – greenoldman Oct 05 '22 at 15:42
  • 1
    How this answer addresses the ​need to run each function at exact times, like at 16:30 every day? – Theodor Zoulias Oct 05 '22 at 22:48
  • 1
    I think the schedule distracts from my main confusion of how to properly run 10 function simultaneously. I am looking forward to digging into you solution. Many thanks! – chrisg229 Oct 06 '22 at 13:16