-1

In my application I have a queue which fires notifications whenever there are any changes to the queue, but sometimes it happens that when there are simultaneous operations on the queue event handler that it fires multiple times and that's okay, but what I don't want is,...

Below is the code for the event handler:

private async void NotificationQueue_Changed(object sender, EventArgs e)
{
  if (!IsQueueInProcess)
    await ProcessQeueue();
}

In ProcessQueue method I am setting IsQueueInProcess to true and whenever it gets completed it is set to false. Now, the problem is that whenever multiple event notifications fire simultaneously multiple ProcessQeueue methods start executing, which I don't want. I want to make sure that there will be only one execution of ProcessQeueue at any given time.

zx485
  • 28,498
  • 28
  • 50
  • 59
Dishant
  • 1,545
  • 12
  • 29
  • Have another boolean value telling wether it is currently being executed? – Visual Vincent May 01 '16 at 08:28
  • Look at [this](https://support.microsoft.com/en-us/kb/816161) support page. – Jeroen Heier May 01 '16 at 08:59
  • @zee try to make the IsQueueInProcess static. – Ilan May 01 '16 at 09:03
  • Couldn't you just make `NotificationQueue_Changed()` a synchronous method? This `await ProcessQueue()` does not make much sense since the calling method itself is a fire and forget method (uses async void which cannot be awaited). – Benji Wa May 01 '16 at 09:10
  • IsQueueInProcess is static only, but the problem is sometimes some many NotificationQueue_Changed event get raised at a time and than before IsQueueInProcess get a change to set to true in ProcessQeueue, multiple ProcessQeueue starts executing. I don't want to make ProcessQeueue syncronized, just want to ignore events if one is running – Dishant May 01 '16 at 09:35
  • IsQueueInProcess should be a Mutex and your function should perform a wait with no or little timeout on it . After getting the Mutex, your function should call ProcessQueue and release the Mutex after the processing returns. If a thread finds the Mutex already held by another thread, it skips the processing and does nothing. – o_weisman May 01 '16 at 11:48

2 Answers2

1

Given your statement that this event is raised whenever there are any changes to the queue, and that the queue can be used concurrently (i.e. there are multiple producers adding things to the queue), it seems likely to me that the best way to address this would be to abandon the event-based behavior altogether. Instead, using BlockingCollection<T>, with a thread dedicated to processing the queue via GetConsumingEnumerable(). That method will block the thread as long as the queue is empty, and will allow the thread to remove and process items in the queue any time any other thread adds something to it. The collection itself is thread-safe, so using that you would not require any additional thread synchronization (for the handling of the queue, that is…it's possible processing an item involves thread interactions, but there's nothing in your question that describes that aspect, so I can't say one way or the other anything about that).

That said, taking the question literally, the simplest approach would be to include a semaphore:

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

private async void NotificationQueue_Changed(object sender, EventArgs e)
{
    if (_semaphore.Wait(0))
    {
        await ProcessQueue();
        _semaphore.Release();
    }
}

The above attempts to acquire the semaphore's lock. With a timeout of 0 milliseconds, it will return immediately even if the semaphore could not be acquired. The return value indicates whether the semaphore was successfully acquired or not.

In this way, as long as there is no outstanding queue-processing operation, the current event handler invocation can acquire the semaphore and will call the ProcessQueue() method. When that operation completes, the continuation will release the semaphore. Until that happens, no other invocation of the event handler will be able to acquire the semaphore, and thus will not initiate processing of the queue.


I'll note that nothing here guarantees a solution to threads racing with each other that would ensure the queue is always either empty, or always has some processing operation acting on it. That's up to you, to ensure that the ProcessQueue() method has the synchronization needed to guarantee that if any thread has modified the queue and caused this event to be raised, that that thread will not fail to initiate another round of processing should the first round not be able to observe the change.

Or put another way, you need to make sure that for any thread that is going to raise that event, either its change to the queue will be observed by the current processing operation, or that thread will initiate a new one.

There's not enough context in your question for anyone to be able to address that concern specifically. I will just point out that it's a common enough thing for someone to overlook when trying to implement this sort of system. IMHO, all the more reason to just have a dedicated thread using BlockingCollection<T> to consume elements added to the queue. :)


See also the related question How to avoid reentrancy with async void event handlers?. This is a slightly different question, in that the accepted answer causes each invocation of the event handler to result in the operation initiated by the event handler. Your scenario is simpler, since you simply want to skip initiation of a new operation, but you may still find some useful insight there.

Community
  • 1
  • 1
Peter Duniho
  • 68,759
  • 7
  • 102
  • 136
1

I agree with Peter that abandoning event-based notifications is the best solution, and that you should move to a producer/consumer queue. However, I recommend one of the TPL Dataflow blocks instead of BlockingCollection<T>.

In particular, ActionBlock<T> should work quite nicely:

private readonly ActionBlock<T> notificationQueue = new ActionBlock<T>(async t =>
{
  await ProcessQueueItem(t);
});

By default, TPL Dataflow blocks have a concurrency limit of 1.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810