0

I have a BackgroundService (IHostedService) that implements ExecuteAsync similar to the example in Implement background tasks in microservices with IHostedService and the BackgroundService class.

I want to run several tasks that execute a long running command in an external system simultaneously. I want the service to continue executing these tasks until the service is stopped - but I don't want to execute the command with the same parameters at the same time (if it's executing with item.parameter1 = 123 I want it to wait until 123 is done, then execute with 123 again). Also, it should not block and it should not leak memory. If an exception occurs I'd like to stop the offending task gracefully, log it and restart. Each execution of the command gets different parameters, so something like this:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var items = GetItems(); //GetItems returns a List<Item>

    _logger.LogDebug($"ExternalCommandService is starting.");

    stoppingToken.Register(() => 
            _logger.LogDebug($" Execute external command background task is stopping."));

    while (!stoppingToken.IsCancellationRequested)
    {
        _logger.LogDebug($"External command task is doing background work.");

        //Execute the command with values from the item
        foreach(var item in items)
        {
             ExecuteExternalCommand(item);
        }

        await Task.Delay(_settings.CheckUpdateTime, stoppingToken);
    }
    
    _logger.LogDebug($"Execute external command background task is stopping.");

}

The structure of the data is pretty simple:

public class MyData
{
    public string Foo { get; set; }
    public string Blee { get; set; }
}

I'm a complete newbie when it comes to Task development so please forgive my lack of understanding. Would it make more sense to make ExecuteExternalCommand asynchronous? I'm not certain that these tasks are being executed in parallel. What am I doing wrong? How do I accomplish the other requirements like exception handling and graceful restarting of the tasks?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
TrevorBrooks
  • 3,590
  • 3
  • 31
  • 53
  • What is in `QueryTheDatabase`? – Peter Bons Nov 10 '18 at 20:29
  • It's a synchronous method that calls a long running command in another system. The name I gave it here doesn't accurately reflect what it does, I'll update it. – TrevorBrooks Nov 10 '18 at 21:30
  • Your code is running sequentially and async doesn’t mean parallel execution. If you want to run your foreach-loop in parallel use `Parallel.ForEach(items, ExecuteExternalCommand);`. Also, do you really want that your items are processed over and over in your virtually infinite while-loop? Shouldn’t the items list be cleared at some time? – ckuri Nov 11 '18 at 10:36
  • With regard to your other requirements instead of directly calling ExecuteExternalCommand do `Parallel.ForEach(items, item => { while (!stoppingToken.IsCancellationRequested) try { ExecuteExternalCommand(item); break; } catch (Exception e) { _logger.LogError(e); } });`, so that when an exception happens it’s logged and the command is retried. If the command is successful the code will break out of the loop. Or you could just remove an item from your list when it was successfully executed, if they only need to be executed once. – ckuri Nov 11 '18 at 10:53
  • As they complete I'd like them to execute again over and over until they are cancelled or the service is stopped. – TrevorBrooks Nov 12 '18 at 13:50
  • 1
    if `ExecuteExternalCommand` is a long running command, it might be worthwhile to pass it the cancellation token so it can be cancelled, otherwise your service won't stop until it returns – ESG Nov 12 '18 at 18:36
  • 1
    How many operations are you wanting to run? It sounds like you want a unique operation for each unique type. ```item.parameter1 = 123``` on operation that loops until stopped and so on. If so then the recommendations for ```Parallel``` may not be the best option. This is for splitting a single complex operation into many (forking and joining). I believe your work is a bit simpler where you can use a query to break out the parameters into groups or a single list and sending each one into it's own ```Task``` method that can run until stopped. – Michael Puckett II Nov 12 '18 at 20:00
  • @Michael Puckett II Yes, for each item in items I want a task to do the work, return when complete and then run again. I'd like each task to run simultaneously with different items. – TrevorBrooks Nov 12 '18 at 20:10
  • 1
    @TrevorBrooks if you have a lot of different tasks with different scheduling needs you should probably look at a proper job scheduler like Hangfire. `ExecuteAsync` is just the method called to execute whatever you want in the background. – Panagiotis Kanavos Nov 13 '18 at 09:32
  • Related: [Dynamically processing a concurrent collection in parallel by group but serially within each group](https://stackoverflow.com/questions/71000722/dynamically-processing-a-concurrent-collection-in-parallel-by-group-but-serially). – Theodor Zoulias Feb 15 '23 at 09:00
  • Trevor I changed the title to something more descriptive. Feel free to rollback if you don't like the new title. – Theodor Zoulias Feb 15 '23 at 09:13

1 Answers1

3

You have data, that you want grouped by a particular value, and then ran in it's own loop repeatedly. I will give an example for one way of doing this that hopefully you can lean on to get what you need for your answer.

Note: This example may not be useful based on how the data is mocked. If you provide the proper data structure in your question I'll update the answer to match.

You have data.

public struct Data
{
    public Data(int value) => Value = value;
    public int Value { get; }
    public string Text => $"{nameof(Data)}: {Value}";
}

You want the data grouped by a particular value. (NOTE: This example may not be logical because it's grouping the data by the Value which is already unique. This is added just for example:)

There are many ways to do this but I'll use Linq, Distinct, and the IEqualityComparer<T> for this example which compares the Data.Value.

public class DataDistinction : IEqualityComparer<Data>
{
    public bool Equals(Data x, Data y) => x.Value == y.Value;
    public int GetHashCode(Data obj) => obj.Value.GetHashCode();
}

Mock the Data For this example I'll mock some data.

var dataItems = new List<Data>();

for (var i = 0; i < 10; i++)
{
    dataItems.Add(new Data(i));
}

Process the Data For this example I'll use the IEqualityComparer<T> to get each Data uniquely by Value and start the processing.

private static void ProcessData(List<Data> dataItems)
{
    var groupedDataItems = dataItems.Distinct(new DataDistinction());

    foreach (var data in groupedDataItems)
    {
        LoopData(data); //We start unique loops here.
    }
}

Loop the Unique Data

For this example I chose to start a new Task using the Task.Factory. This would be one time using it makes sense but typically you don't need it. In this example I pass in an Action<object> where object represents the state or parameter given to the Task. I also parse the state which you will see and start a while loop. The while loop monitors the CancellationTokenSource for cancellation. I declared the CancellationTokenSource statically in the app for convenience; which you will see in the full Console App at the bottom.

private static void LoopData(Data data)
{
    Task.Factory.StartNew((state) =>
    {
        var taskData = (Data)state;
        while (!cancellationTokenSource.IsCancellationRequested)
        {
            Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
            Task.Delay(100).Wait();
        }
    },
    data,
    cancellationTokenSource.Token,
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);
}

Now I put it all into a single Console App you can copy and paste and explore.

This will take your data, break it up, and run it all in it's all Task indefinitely until the program is ended.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Question_Answer_Console_App
{
    class Program
    {
        private static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        static void Main(string[] args)
        {
            var dataItems = new List<Data>();

            for (var i = 0; i < 10; i++)
            {
                dataItems.Add(new Data(i));
            }   

            ProcessData(dataItems);
            Console.ReadKey();
            cancellationTokenSource.Cancel();
            Console.WriteLine("CANCELLING...");
            Console.ReadKey();
        }

        private static void ProcessData(List<Data> dataItems)
        {
            var groupedDataItems = dataItems.Distinct(new DataDistinction());

            foreach (var data in groupedDataItems)
            {
                LoopData(data);
            }
        }

        private static void LoopData(Data data)
        {
            Task.Factory.StartNew((state) =>
            {
                var taskData = (Data)state;
                while (!cancellationTokenSource.IsCancellationRequested)
                {
                    Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
                    Task.Delay(100).Wait();
                }
            },
            data,
            cancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }

        ~Program() => cancellationTokenSource?.Dispose();
    }

    public struct Data
    {
        public Data(int value) => Value = value;
        public int Value { get; }
        public string Text => $"{nameof(Data)}: {Value}";
    }

    public class DataDistinction : IEqualityComparer<Data>
    {
        public bool Equals(Data x, Data y) => x.Value == y.Value;
        public int GetHashCode(Data obj) => obj.Value.GetHashCode();
    }
}
//OUTPUT UNTIL CANCELLED
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 5        Text: Data: 5
//Value: 1        Text: Data: 1
//Value: 7        Text: Data: 7
//Value: 4        Text: Data: 4
//Value: 0        Text: Data: 0
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 6        Text: Data: 6
//Value: 5        Text: Data: 5
//Value: 3        Text: Data: 3
//Value: 8        Text: Data: 8
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 2        Text: Data: 2
//Value: 9        Text: Data: 9
//Value: 7        Text: Data: 7
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 8        Text: Data: 8
//Value: 5        Text: Data: 5
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 1        Text: Data: 1
//Value: 4        Text: Data: 4
//Value: 9        Text: Data: 9
//Value: 7        Text: Data: 7
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 1        Text: Data: 1
//Value: 4        Text: Data: 4
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 1        Text: Data: 1
//Value: 9        Text: Data: 9
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 4        Text: Data: 4
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 4        Text: Data: 4
//Value: 9        Text: Data: 9
//Value: 1        Text: Data: 1
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 5        Text: Data: 5
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 4        Text: Data: 4
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 5        Text: Data: 5
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 8        Text: Data: 8
//Value: 1        Text: Data: 1
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 7        Text: Data: 7
//Value: 1        Text: Data: 1
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 7        Text: Data: 7
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
// CANCELLING...

In the comments you asked how to write to console when the Task is complete. You can await the Task in this example and when the Task is complete you can print that Data for show. async void isn't recommended for good reason but this is one time when it's used correctly.

Here's the updated LoopData with the async await signature.

private static async void LoopData(Data data)
{
    await Task.Factory.StartNew((state) =>
    {
        var taskData = (Data)state;
        while (!cancellationTokenSource.IsCancellationRequested)
        {
            Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
            Task.Delay(100).Wait();
        }
    },
    data,
    cancellationTokenSource.Token,
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    Console.WriteLine($"Task Complete: {data.Value} : {data.Text}");
}

//OUTPUT
//Value: 0        Text: Data: 0
//Value: 1        Text: Data: 1
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 5        Text: Data: 5
//Value: 4        Text: Data: 4
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 8        Text: Data: 8
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 1        Text: Data: 1
//Value: 6        Text: Data: 6
//Value: 9        Text: Data: 9
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 1        Text: Data: 1
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 5        Text: Data: 5
//Value: 4        Text: Data: 4
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 6        Text: Data: 6
//Value: 9        Text: Data: 9
// CANCELLING...
//Task Complete: 0 : Data: 0
//Task Complete: 2 : Data: 2
//Task Complete: 3 : Data: 3
//Task Complete: 1 : Data: 1
//Task Complete: 5 : Data: 5
//Task Complete: 4 : Data: 4
//Task Complete: 8 : Data: 8
//Task Complete: 6 : Data: 6
//Task Complete: 7 : Data: 7
//Task Complete: 9 : Data: 9..
Michael Puckett II
  • 6,586
  • 5
  • 26
  • 46
  • This is great, thank you...just working my way through it. – TrevorBrooks Nov 12 '18 at 21:21
  • I have added the structure of my data. – TrevorBrooks Nov 12 '18 at 23:09
  • Thank you for your answer it really helped me a lot. I was able to incorporate this into my work. One last question if it's not too difficult, how do I write to the console when each task completes? – TrevorBrooks Nov 14 '18 at 21:28
  • 1
    Hey @TrevorBrooks I've updated the question with your comment question here at the bottom. Let me know if you have anymore questions and apologies I didn't see the previous ones until now. – Michael Puckett II Nov 14 '18 at 23:30
  • Thank you. That line actually never gets called. :-/ – TrevorBrooks Nov 15 '18 at 13:56
  • It will once you press a key in the console. The task is set to run indefinitely until you tell it to stop. This example is designed so that you press any key to stop the tasks then press any key again to end the app. – Michael Puckett II Nov 15 '18 at 16:33