1

I am somewhat new to parallel programming C# (When I started my project I worked through the MSDN examples for TPL) and would appreciate some input on the following example code. It is one of several background worker tasks. This specific task pushes status messages to a log.

var uiCts = new CancellationTokenSource();
var globalMsgQueue = new ConcurrentQueue<string>();

var backgroundUiTask = new Task(
() =>
{
    while (!uiCts.IsCancellationRequested)
    {
        while (globalMsgQueue.Count > 0)
            ConsumeMsgQueue();
        Thread.Sleep(backgroundUiTimeOut);
    }
},
uiCts.Token);

// Somewhere else entirely
backgroundUiTask.Start();
Task.WaitAll(backgroundUiTask);

I'm asking for professional input after reading several topics like Alternatives to using Thread.Sleep for waiting, Is it always bad to use Thread.Sleep()?, When to use Task.Delay, when to use Thread.Sleep?, Continuous polling using Tasks

Which prompts me to use Task.Delay instead of Thread.Sleep as a first step and introduce TaskCreationOptions.LongRunning.

But I wonder what other caveats I might be missing? Is polling the MsgQueue.Count a code smell? Would a better version rely on an event instead?

0x492559F64E
  • 124
  • 1
  • 13
  • 2
    `// Somewhere else entirely ... backgroundUiTask.Start()` that's a bug. Tasks aren't threads, they represent stuff that may or may not run on threads from a threadpool. There's seldom any reason to use the Task constructor and definitely *no* reason to use it in common code. Use Task.Run instead. – Panagiotis Kanavos Nov 21 '19 at 09:11
  • 1
    Instead of MsgQueue.Count, you have access to [ConcurrentQueue.TryDequeue(T)](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentqueue-1.trydequeue?view=netframework-4.8) – A Friend Nov 21 '19 at 09:12
  • 1
    Apart from that, what do you mean `background tasks`? The task started from a single `Task.Run` is a background task already. If you want to create a background worker with a queue, you can use the built-in [ActionBlock class](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1?view=netcore-3.0). If you want greater control, use a [Channel](https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels). You don't need to use a ConcurrentQueue at all – Panagiotis Kanavos Nov 21 '19 at 09:13
  • @PanagiotisKanavos Thank you for the input, I will think about how I can change this in my project. The creation as Task objects initially stems from the project's design, where a core method in the base class runs and manages active tasks. – 0x492559F64E Nov 21 '19 at 09:21
  • @PanagiotisKanavos I was not aware of ActionBlock and Channel. I will invest time to read up on them. – 0x492559F64E Nov 21 '19 at 09:26
  • @0x492559F64E `where a core method in the base class runs and manages active tasks.` that's not how tasks work. No matter what you do in your code, task execution is **only** controlled by the TaskScheduler you use and the thread pool (if you use the default task scheduler). The only way to affect how tasks are executed is to use a custom task scheduler. If you want to limit the number of concurrent tasks for example you'd need to use a TaskScheduler that limits – Panagiotis Kanavos Nov 21 '19 at 09:49
  • 1
    @0x492559F64E check the `LimitedConcurrencyLevelTaskScheduler` in the [TaskScheduler](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=netframework-4.8) documentation. Also note that tasks are started with `TaskFactory.StartNew`, not `Task.Start`. The API is designed this way - the factory knows what scheduler to use and starts tasks correctly. `Task.Run` simplifies this by using specific defaults for the factory and task options – Panagiotis Kanavos Nov 21 '19 at 09:52
  • 1
    @0x492559F64E [Task.Start](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.start?view=netframework-4.8) can also accept a scheduler but as the docs say, `Task.Start` shouldn't be used in most cases, certainly not to control concurrency : `you do this when you need to separate the task's creation from its execution, such as when you conditionally execute tasks that you've created. For the more common case in which you don't need to separate task instantiation from execution, we recommend that you call an overload of the Task.Run or TaskFactory.StartNew method.` – Panagiotis Kanavos Nov 21 '19 at 09:55
  • @PanagiotisKanavos My comment was too open for interpretation: What I meant is the single point of start up of the application. The Main if you will. It starts the various tasks and waits for the completion (Task.WaitAll) in case of a graceful shutdown. "Manages" really only is a list of active processes that can be queried from the UI for debugging and maintenance purposes. – 0x492559F64E Nov 21 '19 at 09:56
  • 1
    @0x492559F64E there's no need to use `Task.Start` for that. Tasks aren't threads. It sounds like you're trying to create a [BackgroundService](https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.2&tabs=visual-studio#backgroundservice-base-class) which, despite what the page implies, can be used in any .NET Standard compliant runtime, just like all Microsoft.Extensions.* packages. – Panagiotis Kanavos Nov 21 '19 at 09:58
  • @PanagiotisKanavos I'm not sure about the BackgroundService. To be more precise, it is a tcp server. I might be able to publish the core to GitHub under CC for educational purposes but don't have green light for this yet. My main intention was to keep different aspects of the server responsive without starving the ThreadPool and unnecessarily using up resources. – 0x492559F64E Nov 21 '19 at 10:07

3 Answers3

2

First of all, there's no reason to use Task.Start or use the Task constructor. Tasks aren't threads, they don't run themselves. They are a promise that something will complete in the future and may or may not produce any results. Some of them will run on a threadpool thread. Use Task.Run to create and run the task in a single step when you need to.

I assume the actual problem is how to create a buffered background worker. .NET already offers classes that can do this.

ActionBlock< T >

The ActionBlock class already implements this and a lot more - it allows you to specify how big the input buffer is, how many tasks will process incoming messages concurrently, supports cancellation and asynchronous completion.

A logging block could be as simple as this :

_logBlock=new ActionBlock<string>(msg=>File.AppendAllText("myLog.txt",msg));

The ActionBlock class itself takes care of buffering the inputs, feeding new messages to the worker function when it arrives, potentially blocking senders if the buffer gets full etc. There's no need for polling.

Other code can use Post or SendAsync to send messages to the block :

_block.Post("some message");

When we are done, we can tell the block to Complete() and await for it to process any remaining messages :

_block.Complete();
await _block.Completion;

Channels

A newer, lower-level option is to use Channels. You can think of channels as a kind of asynchronous queue, although they can be used to implement complex processing pipelines. If ActionBlock was written today, it would use Channels internally.

With channels, you need to provide the "worker" task yourself. There's no need for polling though, as the ChannelReader class allows you to read messages asynchronously or even use await foreach.

The writer method could look like this :

public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
    _=Task.Run(async ()=>{
        await foreach(var msg in channel.Reader.ReadAllAsync(token))
        {
            File.AppendAllText(path,msg);
        }
    },token).ContinueWith(t=>writer.TryComplete(t.Exception);
    return writer;
}

....

_logWriter=LogIt(somePath);

Other code can send messages by using WriteAsync or TryWrite, eg :

_logWriter.TryWrite(someMessage);

When we're done, we can call Complete() or TryComplete() on the writer :

_logWriter.TryComplete();

The line

.ContinueWith(t=>writer.TryComplete(t.Exception);

is needed to ensure the channel is closed even if an exception occurs or the cancellation token is signaled.

This may seem too cumbersome at first. Channels allow us to easily run initialization code or carry state from one message to the next. We could open a stream before the loop starts and use it instead of reopening the file each time we call File.AppendAllText, eg :

public ChannelWriter<string> LogIt(string path,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
    _=Task.Run(async ()=>{
       //***** Can't do this with an ActionBlock ****
        using(var writer=File.AppendText(somePath))
        {
            await foreach(var msg in channel.Reader.ReadAllAsync(token))
            {
                writer.WriteLine(msg);
                //Or
                //await writer.WriteLineAsync(msg);
            }
        }
    },token).ContinueWith(t=>writer.TryComplete(t.Exception);
    return writer;
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Best example of `Channel` I have found! One question though. Let's say that all logs have been written to the channel and the app exits. We then want all logs to be written to disk. Is the `ReadAllAsync()` call guaranteed to consume all messages or do we need to wait for the reader to catch up? If so, how? – l33t May 25 '23 at 22:12
0

Definitely Task.Delay is better than Thread.Sleep, because you will not be blocking the thread on the pool, and during the wait the thread on the pool will be available to handle other tasks. Then, you don't need to make your task long-running. Long-running tasks are run in a dedicated thread, and then Task.Delay is meaningless.

Instead, I will recommend a different approach. Just use System.Threading.Timer and make your life simple. Timers are kernel objects that will run their callback on the thread pool, and you will not have to worry about delay or sleep.

Nick
  • 4,787
  • 2
  • 18
  • 24
0

The TPL Dataflow library is the preferred tool for this kind of job. It allows building efficient producer-consumer pairs quite easily, and more complex pipelines as well, while offering a complete set of configuration options. In your case using a single ActionBlock should be enough.

A simpler solution you might consider is to use a BlockingCollection. It has the advantage of not requiring the installation of any package (because it is built-in), and it's also much easier to learn. You don't have to learn more than the methods Add, CompleteAdding, and GetConsumingEnumerable. It also supports cancellation. The drawback is that it's a blocking collection, so it blocks the consumer thread while waiting for new messages to arrive, and the producer thread while waiting for available space in the internal buffer (only if you specify a boundedCapacity in the constructor).

var uiCts = new CancellationTokenSource();
var globalMsgQueue = new BlockingCollection<string>();

var backgroundUiTask = new Task(() =>
{
    foreach (var item in globalMsgQueue.GetConsumingEnumerable(uiCts.Token))
    {
        ConsumeMsgQueueItem(item);
    }
}, uiCts.Token);

The BlockingCollection uses a ConcurrentQueue internally as a buffer.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104