21

I have a couple of situations in my code where various threads can create work items that, for various reasons, shouldn't be done in parallel. I'd like to make sure the work gets done in a FIFO manner, regardless of what thread it comes in from. In Java, I'd put the work items on a single-threaded ExecutorService; is there an equivalent in C#? I've cobbled something together with a Queue and a bunch of lock(){} blocks, but it'd be nice to be able to use something off-the-shelf and tested.

Update: Does anybody have experience with System.Threading.Tasks? Does it have a solution for this sort of thing? I'm writing a Monotouch app so who knows if I could even find a backported version of it that I could get to work, but it'd at least be something to think about for the future.

Update #2 For C# developers unfamiliar with the Java libraries I'm talking about, basically I want something that lets various threads hand off work items such that all those work items will be run on a single thread (which isn't any of the calling threads).


Update, 6/2018: If I was architecting a similar system now, I'd probably use Reactive Extensions as per Matt Craig's answer. I'm leaving Zachary Yates' answer the accepted one, though, because if you're thinking in Rx you probably wouldn't even ask this question, and I think ConcurrentQueue is easier to bodge into a pre-Rx program.

David Moles
  • 48,006
  • 27
  • 136
  • 235
  • If work should get done in a FIFO manner, why create "various" threads at all? Why not do the work on a single thread? – Mark Jan 11 '11 at 19:40
  • @Mark The various threads exist for other reasons -- some of the work is triggered by UI activity, some by responses to network requests, some by timers. – David Moles Jan 11 '11 at 21:06
  • 1
    To this question: "how to serialize mulithreaded access to a resource?" you answered by yourself: use the lock() statement on your resources (or an object that incapsulate them). – BertuPG Feb 22 '11 at 10:56
  • @BertuPG IMHO it's better to have a single thread managing a resource, it's more clean and helps to avoid weird bugs in the future. The other threads will send "messages" to that thread/object. This is the essence of object-oriented programming as defined by Alan Kay, not classes and inheritance, messages are more important, the execution model is important. The Qt framework's signal/slots mechanism (C++) applies this concept very well: objects can belong to threads that have event loops. – Aminos Sep 17 '22 at 19:28

6 Answers6

4

Update: To address the comments on wasting resources (and if you're not using Rx), you can use a BlockingCollection (if you use the default constructor, it wraps a ConcurrentQueue) and just call .GetConsumingEnumerable(). There's an overload that takes a CancellationToken if the work is long-running. See the example below.


You can use ConcurrentQueue, (if monotouch supports .net 4?) it's thread safe and I think the implementation is actually lockless. This works pretty well if you have a long-running task (like in a windows service).

Generally, your problem sounds like you have multiple producers with a single consumer.

var work = new BlockingCollection<Item>();
var producer1 = Task.Factory.StartNew(() => {
    work.TryAdd(item); // or whatever your threads are doing
});
var producer2 = Task.Factory.StartNew(() => {
    work.TryAdd(item); // etc
});
var consumer = Task.Factory.StartNew(() => {
    foreach (var item in work.GetConsumingEnumerable()) {
        // do the work
    }
});
Task.WaitAll(producer1, producer2, consumer);

You should use BlockingCollection if you have a finite pool of work items. Here's an MSDN page showing all of the new concurrent collection types.

Zachary Yates
  • 12,966
  • 7
  • 55
  • 87
  • 3
    While this solution might have the same outcome as the Java one, the `consumer` thread is going to keep running. The `TryDequeue` only provides safety against concurrency, but it will not keep waiting if there are no elements in the queue. If no elements are in the queue, you have one thread in a while loop running. The Java version definitely does not waste resources like this. A `wait` - `notify` solution would be much preferable over this. – andras Aug 08 '18 at 07:51
  • You can use the BlockingCollection.CompleteAdding() method which will stop the foreach consumer thread from running any further. – Abhijith C R Sep 28 '20 at 20:53
1

I believe this can be done using a SynchronizationContext. However, I have only done this to post back to the UI thread, which already has a synchronization context (if told to be installed) provided by .NET -- I don't know how to prepare it for use from a "vanilla thread" though.

Some links I found for "custom synchronizationcontext provider" (I have not had time to review these, do not fully understand the working/context, nor do I have any additional information):

  1. Looking for an example of a custom SynchronizationContext (Required for unit testing)

  2. http://codeidol.com/csharp/wcf/Concurrency-Management/Custom-Service-Synchronization-Context/

Happy coding.

Community
  • 1
  • 1
1

There is a more contemporary solution now available - the EventLoopScheduler class.

Matt Craig
  • 158
  • 6
0

Not native AFAIK, but look at this: Serial Task Executor; is this thread safe?

Community
  • 1
  • 1
Felice Pollano
  • 32,832
  • 9
  • 75
  • 115
0

I made an example here https://github.com/embeddedmz/message_passing_on_csharp which makes use of BlockingCollection.

So you will have a class that manages a resource and you can use the class below which creates a thread that will be the only one to manage it :

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

    public class ResourceManagerThread<Resource>
        {
            private readonly Resource _managedResource;
            private readonly BlockingCollection<Action<Resource>> _tasksQueue;
            private Task _task;
            private readonly object _taskLock = new object();
    
            public ResourceManagerThread(Resource resource)
            {
                _managedResource = (resource != null) ? resource : throw new ArgumentNullException(nameof(resource));
                _tasksQueue = new BlockingCollection<Action<Resource>>();
            }
    
            public Task<T> Enqueue<T>(Func<Resource, T> method)
            {
                var tcs = new TaskCompletionSource<T>();
                _tasksQueue.Add(r => tcs.SetResult(method(r)));
                return tcs.Task;
            }
    
            public void Start()
            {
                lock (_taskLock)
                {
                    if (_task == null)
                    {
                        _task = Task.Run(ThreadMain);
                    }
                }
            }
    
            public void Stop()
            {
                lock (_taskLock)
                {
                    if (_task != null)
                    {
                        _tasksQueue.CompleteAdding();
                        _task.Wait();
                        _task = null;
                    }
                }
            }
    
            public bool HasStarted
            {
                get
                {
                    lock (_taskLock)
                    {
                        if (_task != null)
                        {
                            return _task.IsCompleted == false || 
                                _task.Status == TaskStatus.Running ||
                                _task.Status == TaskStatus.WaitingToRun ||
                                _task.Status == TaskStatus.WaitingForActivation;
                        }
                        else
                        {
                            return false;
                        }
                    }
                }
            }
    
            private void ThreadMain()
            {
                try
                {
                    foreach (var action in _tasksQueue.GetConsumingEnumerable())
                    {
                        try
                        {
                            action(_managedResource);
                        }
                        catch
                        {
                            //...
                        }
                    }
                }
                catch
                {
                }
            }
        }

Example :

private readonly DevicesManager _devicesManager;
private readonly ResourceManagerThread<DevicesManager> _devicesManagerThread;
        
//...
        
_devicesManagerThread = new ResourceManagerThread<DevicesManager>(_devicesManager);
_devicesManagerThread.Start();
_devicesManagerThread.Enqueue((DevicesManager dm) =>
{
    return dm.Initialize();
});

    // Enqueue will return a Task. Use the 'Result' property to get the result of the 'message' or 'request' sent to the the thread managing the resource
Aminos
  • 754
  • 1
  • 20
  • 40
-4

As I wrote in comments, you discovered by yourself that the lock statement can do the work.

If you are interested in getting a "container" that can make simpler the job of managing a queue of work items, look at the ThreadPool class.

I think that, in a well designed architecture, with these two elemnts (ThreadPool class and lock statement) you can easily and succesfully serialize access to resources.

BertuPG
  • 653
  • 4
  • 6
  • 4
    The relevant phrase in my question is "cobbled together", and the relevant one in your answer is "in a well-designed architecture". In fact, while I'm not going to be cruel and vote it down, "the tools exist, go figure out how to build it" isn't really an answer at all. – David Moles Feb 22 '11 at 18:03
  • You can vote down all what you want. I have no time to make a complete example to let you copy&paste and have your work done. You wrote about Queue, I supposed that you used it to manage threads, and I suggested you to look at the ThreadPool class to simplify your work. What's wrong?!? – BertuPG Feb 23 '11 at 09:19
  • @BertuPG it is preferable that a single thread manages a single resource. the code is more deterministic this way. Other threads can send messages to the thread managing the resource (e.g. Qt's threads run an event loop that has a thread safe list of methods to be executed on the object belonging to the thread). – Aminos Sep 30 '22 at 14:54