5

I already used BackgroundWorker and Task to do something in the background and after it posting it back to the UI. I even used BackgroundWorker and ReportProgress with an endless-loop (beside cancellation) to continuously post things to the UI thread.

But this time I need a more controllable scenario: The background thread continuously polls other systems. With Invoke it can send updates the the UI. But how can the UI send message to the background thread? Like changed settings.

In fact I am asking for the best .NET practice to have a worker thread with these specifics:

  • Runs in background, does not block UI
  • Can send updates to UI (Invoke, Dispatch)
  • Runs in endless loop but can be paused, resumed and stopped in a proper way
  • UI thread can send updated settings to the background thread

In my scenario I still use WinForms but I guess it should not matter? I will convert the application to WPF later.

Which best practice do you suggest?

Kristof U.
  • 1,263
  • 10
  • 17
ZoolWay
  • 5,411
  • 6
  • 42
  • 76
  • Any additional suggestions (to BrokenGlass)? What about `SynchronizationContext`? – ZoolWay Feb 22 '14 at 23:08
  • `SynchronizationContext` might work, but that would be a low-level abstract. [My take](http://stackoverflow.com/a/21968329/1768303) is to use TPL and a custom task scheduler. – noseratio Feb 23 '14 at 23:08

2 Answers2

4

I would use TPL and a custom task scheduler for this, similar to Stephen Toub's StaTaskScheduler. That's what WorkerWithTaskScheduler implements below. In this case, the worker thread is also a task scheduler, which can run arbitrary Task items (with ExecutePendingTasks) while doing the work on its main loop.

Executing a lambda wrapped as a TPL Task on the worker thread's context is a very convenient way to send the worker thread a message and get back the result. This can be done synchrounsly with WorkerWithTaskScheduler.Run().Wait/Result or asynchronously with await WorkerWithTaskScheduler.Run(). Note how ContinueExecution and WaitForPendingTasks are used to pause/resume/end the worker's main loop. I hope the code is self-explanatory, but let me know if I should clarify anything.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21628490
{
    // Test
    class Program
    {
        static async Task DoWorkAsync()
        {
            Console.WriteLine("Initial thread: " + Thread.CurrentThread.ManagedThreadId);

            // the worker thread lambda
            Func<WorkerWithTaskScheduler<int>, int> workAction = (worker) =>
            {
                var result = 0;

                Console.WriteLine("Worker thread: " + Thread.CurrentThread.ManagedThreadId);

                while (worker.ContinueExecution)
                {
                    // observe cancellation
                    worker.Token.ThrowIfCancellationRequested();

                    // executed pending tasks scheduled with WorkerWithTaskScheduler.Run
                    worker.ExecutePendingTasks();

                    // do the work item
                    Thread.Sleep(200); // simulate work payload
                    result++;

                    Console.Write("\rDone so far: " + result);
                    if (result > 100)
                        break; // done after 100 items
                }
                return result;
            };

            try
            {
                // cancel in 30s
                var cts = new CancellationTokenSource(30000);
                // start the worker
                var worker = new WorkerWithTaskScheduler<int>(workAction, cts.Token);

                // pause upon Enter
                Console.WriteLine("\nPress Enter to pause...");
                Console.ReadLine();
                worker.WaitForPendingTasks = true;

                // resume upon Enter
                Console.WriteLine("\nPress Enter to resume...");
                Console.ReadLine();
                worker.WaitForPendingTasks = false;

                // send a "message", i.e. run a lambda inside the worker thread
                var response = await worker.Run(() =>
                {
                    // do something in the context of the worker thread
                    return Thread.CurrentThread.ManagedThreadId;
                }, cts.Token);
                Console.WriteLine("\nReply from Worker thread: " + response);

                // End upon Enter
                Console.WriteLine("\nPress Enter to stop...");
                Console.ReadLine();

                // worker.EndExecution() to get the result gracefully
                worker.ContinueExecution = false; // or worker.Cancel() to throw
                var result = await worker.WorkerTask;

                Console.Write("\nFinished, result: " + result);
            }
            catch (Exception ex)
            {
                while (ex is AggregateException)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
            }
        }

        static void Main(string[] args)
        {
            DoWorkAsync().Wait();
            Console.WriteLine("\nPress Enter to Exit.");
            Console.ReadLine();
        }
    }

    //
    // WorkerWithTaskScheduler
    //

    public class WorkerWithTaskScheduler<TResult> : TaskScheduler, IDisposable
    {
        readonly CancellationTokenSource _workerCts;
        Task<TResult> _workerTask;

        readonly BlockingCollection<Task> _pendingTasks;
        Thread _workerThread;

        volatile bool _continueExecution = true;
        volatile bool _waitForTasks = false;

        // start the main loop
        public WorkerWithTaskScheduler(
            Func<WorkerWithTaskScheduler<TResult>, TResult> executeMainLoop,
            CancellationToken token)
        {
            _pendingTasks = new BlockingCollection<Task>();
            _workerCts = CancellationTokenSource.CreateLinkedTokenSource(token);

            _workerTask = Task.Factory.StartNew<TResult>(() =>
            {
                _workerThread = Thread.CurrentThread;
                return executeMainLoop(this);
            }, _workerCts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }

        // a sample action for WorkerWithTaskScheduler constructor
        public static void ExecuteMainLoop(WorkerWithTaskScheduler<TResult> worker)
        {
            while (!worker.ContinueExecution)
            {
                worker.Token.ThrowIfCancellationRequested();
                worker.ExecutePendingTasks();
            }
        }

        // get the Task
        public Task<TResult> WorkerTask
        {
            get { return _workerTask; }
        }

        // get CancellationToken 
        public CancellationToken Token
        {
            get { return _workerCts.Token; }
        }

        // check/set if the main loop should continue
        public bool ContinueExecution
        {
            get { return _continueExecution; }
            set { _continueExecution = value; }
        }

        // request cancellation
        public void Cancel()
        {
            _workerCts.Cancel();
        }

        // check if we're on the correct thread
        public void VerifyWorkerThread()
        {
            if (Thread.CurrentThread != _workerThread)
                throw new InvalidOperationException("Invalid thread.");
        }

        // check if the worker task itself is still alive
        public void VerifyWorkerTask()
        {
            if (_workerTask == null || _workerTask.IsCompleted)
                throw new InvalidOperationException("The worker thread has ended.");
        }

        // make ExecutePendingTasks block or not block
        public bool WaitForPendingTasks
        {
            get { return _waitForTasks; }
            set 
            { 
                _waitForTasks = value;
                if (value) // wake it up
                    Run(() => { }, this.Token);
            }
        }

        // execute all pending tasks and return
        public void ExecutePendingTasks()
        {
            VerifyWorkerThread();

            while (this.ContinueExecution)
            {
                this.Token.ThrowIfCancellationRequested();

                Task item;
                if (_waitForTasks)
                {
                    item = _pendingTasks.Take(this.Token);
                }
                else
                {
                    if (!_pendingTasks.TryTake(out item))
                        break;
                }

                TryExecuteTask(item);
            }
        }

        //
        // TaskScheduler methods
        //

        protected override void QueueTask(Task task)
        {
            _pendingTasks.Add(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _pendingTasks.ToArray();
        }

        protected override bool TryExecuteTaskInline(
            Task task, bool taskWasPreviouslyQueued)
        {
            return _workerThread == Thread.CurrentThread &&
                TryExecuteTask(task);
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public void Dispose()
        {
            if (_workerTask != null)
            {
                _workerCts.Cancel();
                _workerTask.Wait();
                _pendingTasks.Dispose();
                _workerTask = null;
            }
        }

        //
        // Task.Factory.StartNew wrappers using this task scheduler
        //

        public Task Run(Action action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task<T> Run<T>(Func<T> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task Run(Func<Task> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }

        public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
        {
            VerifyWorkerTask();
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }
    }
}

To implement worker-to-client notifications, you can use the IProgress<T> pattern (example of this).

Community
  • 1
  • 1
noseratio
  • 59,932
  • 34
  • 208
  • 486
  • This looks very interesting. I will check it out. Thanks for sharing that much information! May I ask what you edited? – ZoolWay Feb 24 '14 at 10:28
1

First thing that comes to mind, and the cleanest approach imo is to have the background thread method that is continuously running be an instance method of a class. This class instance can then expose properties/methods that allow others to change state (e.g. through the UI) - some locking may be required since you are reading/updating state from different threads.

BrokenGlass
  • 158,293
  • 28
  • 286
  • 335
  • With this you mean do to `new Thread(new ThreadStart(threadWorkerInstance.RunMethod));` and manipulate the `threadWorkerInstance` from main thread and have `RunMethod` looking for them? – ZoolWay Feb 22 '14 at 21:54
  • Yes, essentially - don't forget to put locks in place to handle concurrency. – BrokenGlass Feb 22 '14 at 23:03