44

I know that asynchronous programming has seen a lot of changes over the years. I'm somewhat embarrassed that I let myself get this rusty at just 34 years old, but I'm counting on StackOverflow to bring me up to speed.

What I am trying to do is manage a queue of "work" on a separate thread, but in such a way that only one item is processed at a time. I want to post work on this thread and it doesn't need to pass anything back to the caller. Of course I could simply spin up a new Thread object and have it loop over a shared Queue object, using sleeps, interrupts, wait handles, etc. But I know things have gotten better since then. We have BlockingCollection, Task, async/await, not to mention NuGet packages that probably abstract a lot of that.

I know that "What's the best..." questions are generally frowned upon so I'll rephrase it by saying "What is the currently recommended..." way to accomplish something like this using built-in .NET mechanisms preferably. But if a third party NuGet package simplifies things a bunch, it's just as well.

I considered a TaskScheduler instance with a fixed maximum concurrency of 1, but seems there is probably a much less clunky way to do that by now.

Background

Specifically, what I am trying to do in this case is queue an IP geolocation task during a web request. The same IP might wind up getting queued for geolocation multiple times, but the task will know how to detect that and skip out early if it's already been resolved. But the request handler is just going to throw these () => LocateAddress(context.Request.UserHostAddress) calls into a queue and let the LocateAddress method handle duplicate work detection. The geolocation API I am using doesn't like to be bombarded with requests which is why I want to limit it to a single concurrent task at a time. However, it would be nice if the approach was allowed to easily scale to more concurrent tasks with a simple parameter change.

Josh
  • 68,005
  • 14
  • 144
  • 156
  • 1
    I would use `BlockingCollection`. Sounds like a simple producer/consumer. – Zer0 Sep 05 '14 at 18:22
  • @Zer0 That would be useful for a synchronous producer/consumer model, but not for an asynchronous one. – Servy Sep 05 '14 at 18:27
  • @Servy No - `BlockingCollection` works great for an asynchronous one. Let me write up an example then. – Zer0 Sep 05 '14 at 18:29
  • Agreed that it is a simple producer/consumer. And the concurrent collections in .NET probably would make the "spin up a `new Thread(...)`" scenario simpler. I guess I was assuming/hoping .NET (or a popular library) might have by now added some concept of a configurable thread pool. – Josh Sep 05 '14 at 18:30
  • @Josh It has done exactly that. – Servy Sep 05 '14 at 18:32
  • 2
    @Josh: I think Servy's answer gives you what you *need* - i.e., a way of limiting (asynchronous) concurrency. There's no dedicated thread, nor the guarantee of FIFO ordering, but it sounds like it would do what you need. That said, if you really want a queue running on a thread, you could use [`AsyncContext`/`AsyncContextThread` from my AsyncEx library](https://github.com/StephenCleary/AsyncEx/wiki/AsyncContext). – Stephen Cleary Sep 05 '14 at 19:47
  • This doesn't answer the threading question but an Azure Servicebus with duplicate detection + an azure function with maxConcurrentCalls or other throttling would scale very well. – Jonas Stensved Nov 24 '21 at 12:39

7 Answers7

68

To create an asynchronous single degree of parallelism queue of work you can simply create a SemaphoreSlim, initialized to one, and then have the enqueing method await on the acquisition of that semaphore before starting the requested work.

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

Of course, to have a fixed degree of parallelism other than one simply initialize the semaphore to some other number.

Servy
  • 202,030
  • 26
  • 332
  • 449
  • 19
    Note that this code *does not* execute tasks on separate dedicated thread (as asked in title), but instead guarantees that tasks run one by one as OP actually needed. – Alexei Levenkov Jun 02 '15 at 14:43
  • 4
    @ThibaultD. No, it does not. – Servy Nov 02 '16 at 13:09
  • 1
    @Thibault D. my answer below does – Alexander Danilov Nov 03 '16 at 20:13
  • 1
    Would it be ok to implement a running task counter as following : `Interlocked.Increment(ref counter); await semaphore.WaitAsync(); [...] Interlocked.Decrement(ref counter); semaphore.Release();` or is there a better way to do it ? – Profet Nov 29 '16 at 12:22
  • @Profet Just use `semaphore.CurrentCount`; – Servy Nov 29 '16 at 14:11
  • Isn't that the number of available tasks that can enter the semaphore ? – Profet Nov 29 '16 at 14:24
  • @Profet You can subtract it from what you initialized it to to get the number currently running. – Servy Nov 29 '16 at 14:30
  • My counter tracks the waiting (+ running) tasks, not only the "running" ones! I submitted an improved version of your class there : http://codereview.stackexchange.com/questions/148459/wpf-async-observabletaskqueue-class feel free to comment... :) – Profet Nov 29 '16 at 17:44
  • @Profet To get the number of tasks *trying* to run, yes, you need a counter. In your earlier comment you asked for a count of the number of tasks currently running (rather than waiting to run), hence my note that you can ask the semaphore. The semaphore does not (publicly) expose the number of people currently waiting on it, so for that you have to keep track yourself. – Servy Nov 29 '16 at 18:04
23

Your best option as I see it is using TPL Dataflow's ActionBlock:

var actionBlock = new ActionBlock<string>(address =>
{
    if (!IsDuplicate(address))
    {
        LocateAddress(address);
    }
});

actionBlock.Post(context.Request.UserHostAddress);

TPL Dataflow is robust, thread-safe, async-ready and very configurable actor-based framework (available as a nuget)

Here's a simple example for a more complicated case. Let's assume you want to:

  • Enable concurrency (limited to the available cores).
  • Limit the queue size (so you won't run out of memory).
  • Have both LocateAddress and the queue insertion be async.
  • Cancel everything after an hour.

var actionBlock = new ActionBlock<string>(async address =>
{
    if (!IsDuplicate(address))
    {
        await LocateAddressAsync(address);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10000,
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});

await actionBlock.SendAsync(context.Request.UserHostAddress);
i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • 5
    Very nice! I will definitely check this out. I have never looked into TPL Dataflow closely but it sounds like it has a lot of advantages over a dedicated thread. – Josh Sep 06 '14 at 20:00
21

Actually you don't need to run tasks in one thread, you need them to run serially (one after another), and FIFO. TPL doesn't have class for that, but here is my very lightweight, non-blocking implementation with tests. https://github.com/Gentlee/SerialQueue

Also have @Servy implementation there, tests show it is twice slower than mine and it doesn't guarantee FIFO.

Example:

private readonly SerialQueue queue = new SerialQueue();

async Task SomeAsyncMethod()
{
    var result = await queue.Enqueue(DoSomething);
}
Alexander Danilov
  • 3,038
  • 1
  • 30
  • 35
13

Use BlockingCollection<Action> to create a producer/consumer pattern with one consumer (only one thing running at a time like you want) and one or many producers.

First define a shared queue somewhere:

BlockingCollection<Action> queue = new BlockingCollection<Action>();

In your consumer Thread or Task you take from it:

//This will block until there's an item available
Action itemToRun = queue.Take()

Then from any number of producers on other threads, simply add to the queue:

queue.Add(() => LocateAddress(context.Request.UserHostAddress));
Zer0
  • 7,191
  • 1
  • 20
  • 34
  • 2
    This requires the consumer to be processing the tasks synchronously. There has to be a thread sitting there doing nothing when there is no work, rather than consuming the operations asynchronously such that there is no thread when there is no work to be done. – Servy Sep 05 '14 at 18:35
  • That's true, but in this scenario, that's okay because as long as any other threads are doing anything, there will be work being thrown onto the queue (even if that work may be immediately abandoned because it isn't necessary). – Josh Sep 05 '14 at 18:37
  • 2
    @Servy The thread will be in a wait state if there is no work to be done. It won't be wasting any CPU time so I don't see any problem with that. Although I still don't understand your comments on "synchronously" or "asynchronously". This is an asynchronous design. – Zer0 Sep 05 '14 at 18:37
  • This question - which I didn't find until it was shown as a related question - asks almost exactly the same question with pretty much the same answers. It was asked two years ago so maybe the state of .NET async hasn't changed as much as I thought? http://stackoverflow.com/questions/11138927/best-way-to-limit-the-number-of-active-tasks-running-via-the-parallel-task-libra – Josh Sep 05 '14 at 18:38
  • 3
    @Zer0 It's an asynchronous *producer* but an entirely synchronous *consumer*. Threads are still expensive, spinning up a lot of them just to have them sitting around doing nothing is still expensive and should be avoided. – Servy Sep 05 '14 at 18:39
  • @Servy That is correct, it is a synchronous consumer, and that works fine when I want to restrict it to one concurrent task (and probably performs better too since it's all on the same thread). But it's also true that your semaphore solution allows an easy scale to 2 or more concurrent tasks which I might need to go to later. – Josh Sep 05 '14 at 18:41
  • @Josh It also limits the operations specifically to executing a synchronous method, rather than being able to handle *any* type of asynchronous operation, whether that asynchronous operation is executing a method in another thread, performing asynchronous IO, or anything else. It also leaves virtually no means for the code adding the task to know when it is finished or what result it computes, if there is one, and it likely isn't going to perform better, because, as mentioned earlier, it's adding in the costs of creating new threads that aren't needed. – Servy Sep 05 '14 at 18:44
  • @Josh This will perform great. `BlockingCollection` uses `ConcurrentQueue` by default. If you actually wanted to scale this up you'll simply add another `Thread` or `Task` consumer. And yes, threads are expensive, but you **need** additional threads to add more parallelism. As it stands this uses a single `Task` or `Thread` which is nothing to be concerned about. – Zer0 Sep 05 '14 at 18:49
  • 1
    @Servy That limitation is easily fixed. Change the definition to `BlockingCollection>`... – Zer0 Sep 05 '14 at 18:51
  • 1
    @Zer0 That doesn't solve the problem. You need to be able to provide either a `Func` or a `Func>` to get that behavior, as is demonstrated in my answer. If you accept a `Task` then the operation has already started. And when you accept a `Func` returning the task you then need a way of having the caller get a hold of the `Task` that the function returns when it actually gets executed. – Servy Sep 05 '14 at 18:53
  • @Servy Yes it does. You can add a `Task` without starting it. In fact you certainly shouldn't start the `Task` before adding it to the queue. Finally, in most producer/consumer patterns it's incredibly rare that the _producer_ cares about the result. Otherwise he's not a very good producer. – Zer0 Sep 05 '14 at 18:55
  • 1
    @Zer0 The BlockingCollection wouldn't be expensive, you creating a whole bunch of threads to be consumers *would* be expensive, or at least more expensive than it needs to be. This is particularly true if the consumers aren't entirely saturated with work 100% of the time. `you need additional threads to add more parallelism` No, you don't. I demonstrate in my answer exactly how to create N degrees of parallelism with as few as *zero* threads executing. – Servy Sep 05 '14 at 18:55
  • It's true that I can spin up multiple threads that monitor the same queue, but that's a code change rather than something that can be easily tweaked with a config setting or constant. But the caller doesn't care about the result or success in this case. It's simply fire-and-forget but limited to one at a time for rate-limiting purposes. – Josh Sep 05 '14 at 18:56
  • @Zer0 The only tasks that can be created without being started are tasks that represent executing a delegate in a thread. You cannot create a startable `Task` that represents any other type of asynchronous operation. To support that you need to accept a function that can create the task. – Servy Sep 05 '14 at 18:56
  • @Servy I'm not sure what you mean by parallelism then. In order to do actual parallelism (as far as the CPU is concerned) then yes, you **need** more than one thread. But anyways, this discussion is getting far too long for comments. – Zer0 Sep 05 '14 at 18:59
  • Essentially what I was wondering is if .NET had added anything that allowed me to construct a private instance of something like ThreadPool but with just a single thread in the pool (or 2, or 3, etc.) and have them execute a synchronous delegate as the work item. BlockingQueue definitely makes it easier to roll my own thread to do that, but I guess there still isn't something built-in. – Josh Sep 05 '14 at 19:00
  • @Servy No - you can add any `Task` at all to the `BlockingCollection`. There are no limitations and they do not need to be started. I think we need to agree to disagree here. – Zer0 Sep 05 '14 at 19:01
  • 2
    @Zer0 That's just it *the CPU doesn't need to be concerned at all* for parallelism to happen. Parallelism can happen without any threads at all. You can, for example, have multiple pending IO requests, each doing work, with no threads being used at all. There are lots of ways of doing "work" without using the CPU. – Servy Sep 05 '14 at 19:02
  • @Zer0 If you plan to have your consumer call `Start` on the task then yes, you need to have startable tasks. If you plan to accept tasks that aren't startable then the operation would need to have started before passing it to the BC, which defeats the whole purpose of using it as it isn't limiting parallelism. – Servy Sep 05 '14 at 19:03
  • @Zer0 See [this post](http://stackoverflow.com/a/23833635/1159478) for a more in depth description of the differences between parallelism and multithreading. They are very different concepts, and you're conflating the two. – Servy Sep 05 '14 at 19:05
  • 1
    For what it's worth, I should just reiterate that my original intention was to *avoid* parallelism. The API I am calling doesn't want to be called frequently. I want to stick things in a queue so the consumer can get to them one at a time. – Josh Sep 05 '14 at 19:05
  • @Josh The above implementation is about as simple as it gets. I can't think of an easier way. It's 3 lines of code. And personally, I would avoid using synchronization primitives for reasons I won't get into. There's no reason to reinvent the wheel here. – Zer0 Sep 05 '14 at 19:06
  • @Josh [See MSDN](http://msdn.microsoft.com/en-us/library/dd267312(v=vs.110).aspx) for more information on this collection. It's specifically designed for producer/consumer. – Zer0 Sep 05 '14 at 19:12
  • Oh this is definitely simple, no doubt. I wasn't necessarily looking for *easy*, but rather a recommended approach (preferably native) that specifically addressed this concern of limiting concurrency. – Josh Sep 05 '14 at 19:20
  • @Josh You realize that, at it's core, my answer is also three lines of code. I simply took the time to wrap those three lines of code into a class that handles all of the responsibility of managing this logic so that, for a consumer, the problem becomes *one* line of code. – Servy Sep 05 '14 at 19:37
5

I'm posting a different solution here. To be honest I'm not sure whether this is a good solution.

I'm used to use BlockingCollection to implement a producer/consumer pattern, with a dedicated thread consuming those items. It's fine if there are always data coming in and consumer thread won't sit there and do nothing.

I encountered a scenario that one of the application would like to send emails on a different thread, but total number of emails is not that big. My initial solution was to have a dedicated consumer thread (created by Task.Run()), but a lot of time it just sits there and does nothing.

Old solution:

private readonly BlockingCollection<EmailData> _Emails =
    new BlockingCollection<EmailData>(new ConcurrentQueue<EmailData>());

// producer can add data here
public void Add(EmailData emailData)
{
    _Emails.Add(emailData);
}

public void Run()
{
    // create a consumer thread
    Task.Run(() => 
    {
        foreach (var emailData in _Emails.GetConsumingEnumerable())
        {
            SendEmail(emailData);
        }
    });
}

// sending email implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}

As you can see, if there are not enough emails to be sent (and it is my case), the consumer thread will spend most of them sitting there and do nothing at all.

I changed my implementation to:

// create an empty task
private Task _SendEmailTask = Task.Run(() => {});

// caller will dispatch the email to here
// continuewith will use a thread pool thread (different to
// _SendEmailTask thread) to send this email
private void Add(EmailData emailData)
{
    _SendEmailTask = _SendEmailTask.ContinueWith((t) =>
    {
        SendEmail(emailData);
    });
}

// actual implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}

It's no longer a producer/consumer pattern, but it won't have a thread sitting there and does nothing, instead, every time it is to send an email, it will use thread pool thread to do it.

Kael Xu
  • 111
  • 1
  • 3
  • Would there be race conditions in your `Add` method? For example, there are two threads calling this method at the same time. But anyway, this is a really concise but effective solution. – Juniver Hazoic May 07 '19 at 08:56
1

My lib, It can:

  1. Run random in queue list
  2. Multi queue
  3. Run prioritize first
  4. Re-queue
  5. Event all queue completed
  6. Cancel running or cancel wait for running
  7. Dispatch event to UI thread

public interface IQueue
  {
    bool IsPrioritize { get; }
    bool ReQueue { get; }
    /// <summary>
    /// Dont use async
    /// </summary>
    /// <returns></returns>
    Task DoWork();
    bool CheckEquals(IQueue queue);
    void Cancel();
  }

  public delegate void QueueComplete<T>(T queue) where T : IQueue;
  public delegate void RunComplete();
  public class TaskQueue<T> where T : IQueue
  {
    readonly List<T> Queues = new List<T>();
    readonly List<T> Runnings = new List<T>();

    [Browsable(false), DefaultValue((string)null)]
    public Dispatcher Dispatcher { get; set; }
    public event RunComplete OnRunComplete;
    public event QueueComplete<T> OnQueueComplete;
    int _MaxRun = 1;
    public int MaxRun
    {
      get { return _MaxRun; }
      set
      {
        bool flag = value > _MaxRun;
        _MaxRun = value;
        if (flag && Queues.Count != 0) RunNewQueue();
      }
    }
    public int RunningCount
    {
      get { return Runnings.Count; }
    }
    public int QueueCount
    {
      get { return Queues.Count; }
    }

    public bool RunRandom { get; set; } = false;

    //need lock Queues first
    void StartQueue(T queue)
    {
      if (null != queue)
      {
        Queues.Remove(queue);
        lock (Runnings) Runnings.Add(queue);
        queue.DoWork().ContinueWith(ContinueTaskResult, queue);
      }
    }

    void RunNewQueue()
    {
      lock (Queues)//Prioritize
      {
        foreach (var q in Queues.Where(x => x.IsPrioritize)) StartQueue(q);
      }

      if (Runnings.Count >= MaxRun) return;//other
      else if (Queues.Count == 0)
      {
        if (Runnings.Count == 0 && OnRunComplete != null)
        {
          if (Dispatcher != null && !Dispatcher.CheckAccess()) Dispatcher.Invoke(OnRunComplete);
          else OnRunComplete.Invoke();//on completed
        }
        else return;
      }
      else
      {
        lock (Queues)
        {
          T queue;
          if (RunRandom) queue = Queues.OrderBy(x => Guid.NewGuid()).FirstOrDefault();
          else queue = Queues.FirstOrDefault();
          StartQueue(queue);
        }
        if (Queues.Count > 0 && Runnings.Count < MaxRun) RunNewQueue();
      }
    }

    void ContinueTaskResult(Task Result, object queue_obj) => QueueCompleted((T)queue_obj);

    void QueueCompleted(T queue)
    {
      lock (Runnings) Runnings.Remove(queue);
      if (queue.ReQueue) lock (Queues) Queues.Add(queue);
      if (OnQueueComplete != null)
      {
        if (Dispatcher != null && !Dispatcher.CheckAccess()) Dispatcher.Invoke(OnQueueComplete, queue);
        else OnQueueComplete.Invoke(queue);
      }
      RunNewQueue();
    }

    public void Add(T queue)
    {
      if (null == queue) throw new ArgumentNullException(nameof(queue));
      lock (Queues) Queues.Add(queue);
      RunNewQueue();
    }

    public void Cancel(T queue)
    {
      if (null == queue) throw new ArgumentNullException(nameof(queue));
      lock (Queues) Queues.RemoveAll(o => o.CheckEquals(queue));
      lock (Runnings) Runnings.ForEach(o => { if (o.CheckEquals(queue)) o.Cancel(); });
    }

    public void Reset(T queue)
    {
      if (null == queue) throw new ArgumentNullException(nameof(queue));
      Cancel(queue);
      Add(queue);
    }

    public void ShutDown()
    {
      MaxRun = 0;
      lock (Queues) Queues.Clear();
      lock (Runnings) Runnings.ForEach(o => o.Cancel());
    }
  }
-1

I know this thread is old, but it seems all the present solutions are extremely onerous. The simplest way I could find uses the Linq Aggregate function to create a daisy-chained list of tasks.

var arr = new int[] { 1, 2, 3, 4, 5};
var queue = arr.Aggregate(Task.CompletedTask, 
    (prev, item) => prev.ContinueWith(antecedent => PerformWorkHere(item)));

The idea is to get your data into an IEnumerable (I'm using an int array), and then reduce that enumerable to a chain of tasks, starting with a default, completed, task.

Cobus Kruger
  • 8,338
  • 3
  • 61
  • 106
  • After few days you will end up with OOM because GC couldn't touch your gigantic chain, because each new task will end up on the pile of old ones. – greenoldman Jul 26 '23 at 14:32