2

Let's say I have a UI thread and a background thread that subscribe to a custom thread-safe ObservableCollection that I created so that whenever the collection changes it executes the callback within the appropriate context.

Now let's say I add something to the collection (from either thread, doesn't matter which one) and it now has to marshall the callback to both threads. To execute the callback within the UI's context I can simply do a Dispatcher.Invoke(...) and it executes the callback within the UI's context; great.

Now I want to execute the callback within the background thread's context (don't ask me why, it may well be that whatever it's accessing is affinitized to that specific thread or has thread-local storage it needs to access); how would I do that?

Background threads don't have a dispatcher/message pumping mechanism so I can't use a dispatcher or SynchronizationContext, so how would one interrupt a background thread and have it execute my callback within its context?

EDIT: I keep getting answers that are obviously wrong so I must not have explained myself correctly. Forget the UI thread and UI dispatchers guys, they were meant to marshall calls to the UI thread, that's it! Imagine two worker threads A and B. If A modifies my collection then A is in charge of marshalling the callback to itself and to B. Executing the callback within A's context is easy since A was the one triggering it : simply call the delegate in place. Now A needs to marshall the callback to B... now what? Dispatcher and SynContext are useless in this situation.

noseratio
  • 59,932
  • 34
  • 208
  • 486
Anthony
  • 604
  • 6
  • 18
  • I would look into using reflection to invoke the callback. – Travis J Apr 11 '14 at 07:50
  • Is it also a valid scenario when you want to execute a callback into the worker thread from inside the `Dispatcher.Invoke` callback made from the worker thread? – noseratio Apr 11 '14 at 08:54
  • @TravisJ : I don't understand how reflection will help. – Anthony Apr 11 '14 at 14:17
  • @Noseratio : Forget dispatcher.invoke, forget the ui thread. Imagine I have 2 worker threads and I want to dispatch my event to both worker threads; what can I use? – Anthony Apr 11 '14 at 14:18
  • I'd implement it [this way](http://stackoverflow.com/a/23016220/1768303). – noseratio Apr 12 '14 at 03:52

3 Answers3

2

A good idea might also be extending your own TaskScheduler, you will have to implement three methods:

QueueTask, TryExecuteTaskInline and GetScheduledTasks

you can read about it here

That way, anytime you need to run something on your dedicated thread you could just do:

Task.Factory.StartNew(() => { SomeAction }, SomeCancellationToken, TaskCreationOptions
            new MyTaskSchedular());

and have it execute on your thread.

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • Apart from the fact that the TPL is a total bag of spanners. A SynchronizationContext provides the abstraction without the need to get involved in that stuff. – satnhak Apr 11 '14 at 08:43
  • I think its a matter of taste. He may choose which ever is easier for him to implement and maintain i guess – Yuval Itzchakov Apr 11 '14 at 08:46
  • @TheMouthofaCow : A synchronization context will not help; it will only marshall calls to a UI thread but not to a background thread. If the target thread is a background thread it will simply execute it in the current thread (whatever that may be). – Anthony Apr 11 '14 at 14:16
  • @Yuval : How do I target specific threads using a task scheduler and how can it be done synchronously? And this is simply creating work on new threads instead of delegating the work to existing threads. – Anthony Apr 11 '14 at 14:25
0

We have a component that must always run on the same STA background thread. We've achieved this by writing our own SynchronizationContext. This article is very helpful.

To summarise, you don't want to interrupt your worker thread, you want it to sit idle waiting for the next task that it should execute. You add jobs to a queue and it processes those jobs in order. The SynchronizationContext is a convenient abstraction around that idea. The SynchronizationContext is the owner of the worker thread - and the outside world does not interact with the thread directly: callers who want to execute a task on the worker thread make the request to the context which adds the job to the job queue. The worker is either working or polling the queue until another job is added, at which point it begins working again.

Update

Here is an example:

using System.Collections.Concurrent;
using System.Threading;

class LoadBalancedContext : SynchronizationContext
{
    readonly Thread thread1;

    readonly Thread thread2;

    readonly ConcurrentQueue<JobInfo> jobs = new ConcurrentQueue<JobInfo>();

    public LoadBalancedContext()
    {
        this.thread1 = new Thread(this.Poll) { Name = "T1" };
        this.thread2 = new Thread(this.Poll) { Name = "T2" };

        this.thread1.Start();
        this.thread2.Start();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        this.jobs.Enqueue(new JobInfo { Callback = d, State = state });
    }

    void Poll()
    {
        while (true)
        {
            JobInfo info;
            if (this.jobs.TryDequeue(out info))
            {
                info.Callback(info.State);
            }

            Thread.Sleep(100);
        }
    }

    class JobInfo
    {
        public SendOrPostCallback Callback { get; set; }

        public object State { get; set; }
    }
}

Usage:

var context = new LoadBalancedContext();

SendOrPostCallback callback = x =>
    {
        Trace.WriteLine(Thread.CurrentThread.Name);
        Thread.Sleep(200);
    };

context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);

Thread.Sleep(1000);

The Send case is slightly more involved as you will need to listen for a reset event.. This is not production quality, but should give you an idea ow what you need to do.

Hope that helps.

satnhak
  • 9,407
  • 5
  • 63
  • 81
  • Bu the SyncrhoinzationContext will not do that to a worker thread; first off each thread doesn't start off with a SyncContext; it needs to be created. UI threads will put a SyncContext there automatically but worker threads don't have one so it has to be created but a SyncContext on a worker thread does *no marshalling* to another worker thread; it executes the callback in whatever thread calls it. Forget about UI threads and let's imagine I have 2 worker threads and I have a callback that needs to execute on both of them; how can I get it to execute within each worker's context? – Anthony Apr 11 '14 at 14:23
  • You are thinking about the problem in the wrong way. You MUST let the context own the (two) threads. The main thread is special and has lots of funky magic around it. It already has the ability to live forever and queue task on it. There is no mechanism that allows you to do that with an arbitrary thread so you need to create a queue of jobs and then poll that queue. I promise that article does exactly what you want. They guy overcomplicates things, but honestly if you read that it will tell you what you want to know. – satnhak Apr 11 '14 at 21:32
  • You're essentially recreating the task pool and I appreciate the alternative but it doesn't answer my question which I now believe is 'no, not possible'. Actually you already answered it in the comment saying there's no mechanism to do this and you're right; I don't expect there to be some message pump on a background thread. I was just wondering if there was any facility in place anywhere in Win32 or CLR land that would allow marshalling to a worker thread without creating your own pump/custom blocking queue thread. – Anthony Apr 12 '14 at 03:08
  • 1
    I think you misunderstand what a thread actually is. Conceptually all a thread is is a sequence of instructions. Once the sequence of instructions is exhausted the thread is complete. To achieve your goal what you want to do is prevent that sequence of instructions from becoming exhausted and to be able to insert more instructions into that sequence in such a way that they are executed as soon as possible. There is a mechanism to do this which is to maintain a queue of work and poll it in an infinite loop. – satnhak Apr 12 '14 at 10:14
  • Trust me, I understand threading at a deep level. What I'm asking isn't a run-of-the-mill kinda question that the average developer can answer; I'm looking to see if there's an exotic way of thread injection. A thread follows an execution path over a sequence of instructions, a path that may or may not overlap with other threads. In theory it's possible to inject a call into a target thread space by modifying the thread's execution pointer (which is saved during context switch) with that of the call and have the call return to the original execution pointer. This is low level stuff. – Anthony Apr 12 '14 at 21:26
  • I was wondering if there were any facilities in CLR/Win32 that allows this considering that the garbage collector is able to do something similar to this. – Anthony Apr 12 '14 at 21:27
  • 1
    @Anthony Maybe not deeply enough! – satnhak Apr 13 '14 at 10:33
  • After writing a mini-os for a custom FPGA cpu and an ansi-c compiler for it... yeah, i think it's deep enough! :P – Anthony Apr 14 '14 at 21:06
0

Forget dispatcher.invoke, forget the ui thread. Imagine I have 2 worker threads and I want to dispatch my event to both worker threads; what can I use?

I'd use two task schedulers for this (as @YuvalItzchakov's answer suggests), one for each thread. I'd also use a custom synchronization context for the worker thread, as @TheMouthofaCow's answer suggests.

That is, for a UI thread, I'd just save and use TaskScheduler.FromCurrentSynchronizationContext(). For the worker thread, I would start a thread and install a custom synchronization context on it, then use FromCurrentSynchronizationContext too.

Something like this (untested):

// UI thread
var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

using (var worker = new ThreadWithPumpingSyncContext())
{
    // call the worker thread
    var result = await worker.Run(async () => 
    {
        // worker thread
        await Task.Delay(1000);

        // call the UI thread
        await Task.Factory.StartNew(async () => 
        {
            // UI thread
            await Task.Delay(2000);
            MessageBox.Show("UI Thread!"), 

            // call the worker thread
            await worker.Run(() => 
            {
                // worker thread
                Thread.Sleep(3000)
            });

            // UI thread
            await Task.Delay(4000);
        }, uiTaskScheduler).Unwrap();

        // worker thread
        await Task.Delay(5000);
        return Type.Missing; // or implement a non-generic version of Run
    });
}

// ...

// ThreadWithSerialSyncContext renamed to ThreadWithPumpingSyncContext
class ThreadWithPumpingSyncContext : SynchronizationContext, IDisposable
{
    public readonly TaskScheduler Scheduler; // can be used to run tasks on the pumping thread
    readonly Task _mainThreadTask; // wrap the pumping thread as Task
    readonly BlockingCollection<Action> _actions = new BlockingCollection<Action>();

    // track async void methods
    readonly object _lock = new Object();
    volatile int _pendingOps = 0; // the number of pending async void method calls
    volatile TaskCompletionSource<Empty> _pendingOpsTcs = null; // to wait for pending async void method calls

    public ThreadWithPumpingSyncContext()
    {
        var tcs = new TaskCompletionSource<TaskScheduler>();
        _mainThreadTask = Task.Factory.StartNew(() =>
        {
            try
            {
                SynchronizationContext.SetSynchronizationContext(this);
                tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext());

                // pumping loop
                foreach (var action in _actions.GetConsumingEnumerable())
                    action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(null);
            }
        }, TaskCreationOptions.LongRunning);

        Scheduler = tcs.Task.Result;
    }

    // SynchronizationContext methods
    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public override void OperationStarted()
    {
        lock (_lock)
        {
            if (_pendingOpsTcs != null && _pendingOpsTcs.Task.IsCompleted)
                throw new InvalidOperationException("OperationStarted"); // shutdown requested
            _pendingOps++;
        }
    }

    public override void OperationCompleted()
    {
        lock (_lock)
        {
            _pendingOps--;
            if (0 == _pendingOps && null != _pendingOpsTcs)
                _pendingOpsTcs.SetResult(Empty.Value);
        }
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        _actions.Add(() => d(state));
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        throw new NotImplementedException("Send");
    }

    // Task start helpers
    public Task Run(Action action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler);
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
    }

    public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
    }

    // IDispose
    public void Dispose()
    {
        var disposingAlready = false;

        lock (_lock)
        {
            disposingAlready = null != _pendingOpsTcs;
            if (!disposingAlready)
            {
                // do not allow new async void method calls
                _pendingOpsTcs = new TaskCompletionSource<Empty>();
                if (0 == _pendingOps)
                    _pendingOpsTcs.TrySetResult(Empty.Value);
            }
        }

        // outside the lock
        if (!disposingAlready)
        {
            // wait for pending async void method calls
            _pendingOpsTcs.Task.Wait();

            // request the end of the pumping loop
            _actions.CompleteAdding();
        }

        _mainThreadTask.Wait();
    }

    struct Empty { public static readonly Empty Value = default(Empty); }
}

This give you some sort of cooperative asynchronous execution between two threads.

Community
  • 1
  • 1
noseratio
  • 59,932
  • 34
  • 208
  • 486
  • Okay so let me get this straight... if both worker threads A and B are busy bees, always working, and thread A happens to modify the collection while it's working... Thread A can execute the callback within its own context easily; you're saying with this method that thread B, which is busy doing calculations, will be interrupted and will execute the callback within its context and return back to its frame of execution? – Anthony Apr 11 '14 at 15:12
  • @Anthony, there's no way you can interrupt a *busy* thread, besides perhaps with `Thread.About`. The thread has to be *ready* and *waiting* to accept an incoming call, it's about cooperative synchronization. That's what `await` does above. In case with the UI thread, it "yields" to the core message loop. In case with the worker thread, it "yields" to the `foreach` loop for `GetConsumingEnumerable`. You can think of this as of coroutines executed by separate threads. – noseratio Apr 11 '14 at 15:30
  • This is my fault for not being very specific in my question which I believe is simply 'no, can't be done'. I was looking for perhaps an undocumented feature that might allow this kind of craziness... for example, the CLR during garbage collection stack-walk will actually inject code in certain places so I thought even though my question may be far fetched it might be possible via some contrived mechanism such as the code-injection I referred to. – Anthony Apr 12 '14 at 03:13
  • @Anthony, that'd be crazy indeed and woudn't make any sense. **Where do you think a loop like `for (var i = 0; i < 100000; i++) sum += i` should be interrupted to run your code, and most importantly, what for?** You need an explicit synchronization point for this. In earlier .NET versions you'd use synchronization primitives like `ManualResetEvent`. Nowadays, TPL and `async/await` make them pretty much redundant for high-level parallel programming. – noseratio Apr 12 '14 at 03:48
  • True, it's crazy and I'm not asking for any practical purpose, it's purely hypothetical. – Anthony Apr 12 '14 at 21:29