6

Say I have 10 threads busily doing something and they sometimes call a method

public HandleWidgets(Widget w) {  HeavyLifting(w) }

However, I don't want my 10 threads to wait on HeavyLifting(w), but instead, dispatch the HeavyLifting(w) work to an 11th thread, the HeavyLifter thread and continue asynchronously. The HeavyLifter thread dispatched to should always be the same thread, and I don't want to make multiple threads (hence, I can't do something quite like this: C# Asynchronous call without EndInvoke?).

HeavyLifting(w) is "fire and forget" in that the threads that call HandleWidgets() don't need a callback or anything like that.

What's a healthy practice for this?

Community
  • 1
  • 1
John
  • 565
  • 1
  • 6
  • 18
  • 3
    Your thread setup is strange. Rather than parceling out the "heavy lifting" to multiple threads, you parcel out the parceling out to multiple threads, and then bog down a single thread with the real work. This is the *opposite* of what you generally want when trying to reap gains from multiple cores. – dlev Dec 20 '13 at 23:34
  • I have edited your title. Please see, "[Should questions include “tags” in their titles?](http://meta.stackexchange.com/questions/19190/)", where the consensus is "no, they should not". – John Saunders Dec 21 '13 at 00:25
  • Hi Dlev, it is unusual. My real-world application is that the ten threads are interfacing with external hardware, and the eleventh thread is solving a problem pertaining to data collated from the hardware. I shouldn't say more than that ;) – John Dec 21 '13 at 02:08

5 Answers5

2

I'm surprised none of the other answers here mention TPL DataFlow. You connect a bunch of blocks together and post data through the chain. You can control the concurrency of each block explicitly, so you could do the following:

var transformBlk =
    new TransformBlock<int,int>(async i => {
        await Task.Delay(i);
        return i * 10;
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 10});

var processorBlk=
    new ActionBlock<int>(async i => {
        Console.WriteLine(i);
    },new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 1});

transformBlk.LinkTo(processorBlk);
var data = Enumerable.Range(1, 20).Select(x => x * 1000);
foreach(var x in data)
{
    transformBlk.Post(x);
}
spender
  • 117,338
  • 33
  • 229
  • 351
1

Create a queue which is shared among all threads and one semaphore, which is also shared among all threads. Worker threads (that should not wait for HeavyLifter) post requests like this:

lock (queue)
{
    queue.Enqueue(request);
    semaphore.Release();
}

HeavyLifter is a background thread (doesn't stop process from exiting) and runs the following code in infinite loop:

while (true)
{
    semaphore.WaitOne();
    Request item = null
    lock (queue)
    {
        item = queue.Dequeue();
    }
    this.ProcessRequest(item);
}

EDIT: typo.

Zoran Horvat
  • 10,924
  • 3
  • 31
  • 43
1

You can create a limited concurrency TaskScheduler to be used as a Task factory, as provided in this example from MSDN, with limitation to single thread:

var lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

The implement your function as:

public HandleWidgets(Widget w) 
{  
    factory.StartNew(() => HeavyLifting(w));
}
Konrad Kokosa
  • 16,563
  • 2
  • 36
  • 58
1

--- EDIT ---

I just noticed you need "fire and forget", in which case a blocking collection alone would be enough. The solution below is really for more complex scenarios where you need to return a result, or propagate an exception, or compose tasks in some fashion (e.g. via async/await) etc...


Use TaskCompletionSource to expose the work done in the "synchronous" thread as a Task-based API to the "client" threads.

For each invocation of HandleWidgets (CT = "client thread", "ST" = synchronous thread):

  • CT: Create a separate TaskCompletionSource.
  • CT: Dispatch HeavyLifting to the ST (probably through a BlockingCollection; also pass the TaskCompletionSource to it, so it can do the last step below).
  • CT: Return TaskCompletionSource's Task to the caller without waiting for the work on ST to finish.
  • CT: Continue normally. If/when it is no longer possible to continue without waiting on HeavyLifting to finish (in the ST), wait on the task above.
  • ST: When HeavyLifting finishes, call SetResult (or SetException or SetCanceled, as appropriate), which unblocks any CTs that might currently wait on the task.
Branko Dimitrijevic
  • 50,809
  • 10
  • 93
  • 167
1

Basically you have threads that are producers of work and one thread that is a consumer of it.

Create a thread and have it Take from a BlockingCollection in a loop. This is your consumer thread which will call HeavyLifting. It will simply wait until an item is available and then proces it:

A call to Take may block until an item is available to be removed.

The other threads can simply add items to the collection.

Note that BlockingCollection doesn't guarantee ordering of items added/removed by itself:

The order in which an item is removed depends on the type of collection used to create the BlockingCollection instance. When you create a BlockingCollection object, you can specify the type of collection to use

ta.speot.is
  • 26,914
  • 8
  • 68
  • 96