2

I have this producer / consumer code :

MAIN :

static void Main()
{
    using(PCQueue q = new PCQueue(2))
    {
        for(int i = 0; i < 10; i++)
        {
            int itemNumber = i; // To avoid the captured variable trap
            q.EnqueueItem(() = > {
                Thread.Sleep(1000); // Simulate time-consuming work
                Console.Write(" Task" + itemNumber);
            });
        }
        Console.WriteLine("Enqueued 10 items");
        Console.WriteLine("Waiting for items to complete...");
    }

}

CLASS :

   public class PCQueue: IDisposable
  {
      readonly object _locker = new object();
      Thread[] _workers;
      Queue < Action > _itemQ = new Queue < Action > ();
      public PCQueue(int workerCount)
      {
          _workers = new Thread[workerCount];
          // Create and start a separate thread for each worker
          for(int i = 0; i < workerCount; i++)
          (_workers[i] = new Thread(Consume)).Start();
      }
      public void Dispose()
      {
          // Enqueue one null item per worker to make each exit.
          foreach(Thread worker in _workers) EnqueueItem(null);
      }
      public void EnqueueItem(Action item)
      {
          lock(_locker)
          {
              _itemQ.Enqueue(item); // We must pulse because we're
              Monitor.Pulse(_locker); // changing a blocking condition.
          }
      }
      void Consume()
      {
          while(true) // Keep consuming until
          { // told otherwise.
              Action item;
              lock(_locker)
              {
                  while(_itemQ.Count == 0) Monitor.Wait(_locker);
                  item = _itemQ.Dequeue();
              }
              if(item == null) return; // This signals our exit.
              item(); // Execute item.
          }
      }
  }

Question :

Lets say Executing the item(); takes a long time.

1) we enqueue a new work and pulse. ( 1 consumer is busy now)
2) we enqueue a new work and pulse. ( second consumer is busy now)
3) we enqueue a new work and pulse. 

And now ? both threads are busy !

I know that the pulse will be lost ( or not ?)

Is the only solution is to change it to AutoResetEvent ?

Community
  • 1
  • 1
Royi Namir
  • 144,742
  • 138
  • 468
  • 792
  • What is wrong with System.Collections.Concurrent.BlockingCollection? If you want to DIY, you can use System.Threading.Semaphore and a lock. Events are not an appropriate solution for producer-consumer queues - I have no idea why so many developers are duped into using them for such a purpose. – Martin James Aug 02 '12 at 13:48
  • @MartinJames Nothing is wrong. I just want to know how can I enhance the code , and to check if my assumptions were right. – Royi Namir Aug 02 '12 at 13:49
  • @MartinJames I guess you will have to ask Joe Albahari about your question .http://books.google.co.il/books?id=VENrFSQFco8C&pg=PA846&lpg=PA846&dq=%22+To+avoid+the+captured+variable+trap%22&source=bl&ots=3uV-ribX9Q&sig=gjsMPGyZD6H-DgcwjR_vuv5V8aI&hl=en&sa=X&ei=noUaUJKpLcem0QXHsYC4Cw&redir_esc=y#v=onepage&q=%22%20To%20avoid%20the%20captured%20variable%20trap%22&f=false – Royi Namir Aug 02 '12 at 13:51
  • Use a semaphore - it WILL work correctly with multiple producers/consumers. An AutoResetEvent might work correctly with multiple producers/consumers or it might not. I've never bothered to test it since it's just not the appropriate synchro to use. – Martin James Aug 02 '12 at 13:52
  • 1
    Did you actually observe a problem? – H H Aug 02 '12 at 13:52
  • @Vitaliy Im losing pulse. ( when all threads are busy) – Royi Namir Aug 02 '12 at 13:57
  • It's not as if there's a big yellow box in the [documentation](http://msdn.microsoft.com/en-us/library/system.threading.monitor.pulse.aspx) saying: "if you call `Pulse` when no threads are waiting, the next thread that calls `Wait` blocks as if `Pulse` had never been called." – Damien_The_Unbeliever Aug 02 '12 at 13:59
  • @Damien_The_Unbeliever please see this : http://books.google.co.il/books?id=_Y0rWd-Q2xkC&pg=PA675&lpg=PA675&dq=%22If+Wait+executes+first,+the+signal+works%22&source=bl&ots=R8EwV7WFWm&sig=D1AzcC6T52ju-LCJdcKN7GssJls&hl=en&sa=X&ei=S4gaULOeIqWs0QX23IHIBA&redir_esc=y#v=onepage&q=%22If%20Wait%20executes%20first%2C%20the%20signal%20works%22&f=false – Royi Namir Aug 02 '12 at 14:02
  • @RoyiNamir so what? the queue is not empty- there will be no dead lock if that's what you fear.. – Vitaliy Aug 02 '12 at 14:02
  • @RoyiNamir - looked briefly at your ref. The Threads 'Getting Started' section starts by mentioning 'time-slice', 'Join' and 'IsAlive' on the very first page. Couldn't be bothered to read on any further. – Martin James Aug 02 '12 at 14:02
  • @MartinJames the link is http://books.google.co.il/books?id=_Y0rWd-Q2xkC&pg=PA676&lpg=PA676&dq=%22Create+and+start+a+separate+thread+for+each+worker%22&source=bl&ots=R8EwV7VL1l&sig=-H8aworDuhQjR9KrAZgToxUKPUU&hl=en&sa=X&ei=AIcaUIC4HsbB0QW60YHoAQ&redir_esc=y#v=onepage&q=%22Create%20and%20start%20a%20separate%20thread%20for%20each%20worker%22&f=false – Royi Namir Aug 02 '12 at 14:06

1 Answers1

3

And now ? both threads are busy !
I know that the pulse will be lost ( or not ?)

Yes, when (all) your threads are busy executing the Item() call, a Pulse will be lost.

But that does not have to be a problem, you are Pulsing after every Enqueue(), and basically a Pulse is only required when queue.Count has gone from 0 to 1. You have many more Pulses then needed.

But when you were to try to optimize the number of Pulses you would probably get into trouble. The fact that Wait/Pulse is stateless does mean you should use it carefully.

H H
  • 263,252
  • 30
  • 330
  • 514
  • Henk , I dont understand. (sorry). _and basically a Pulse is only required when queueu_... OK im queuing , but a pulse is lost.... it can lead to a situation where one work item didnt get a treatment. right? – Royi Namir Aug 02 '12 at 13:59
  • Foolish me , I didnt look at the while loop. it scan until it will be 0. so basically I can push 1000 items with 2 pulses only , and the queue will be digested until 0. ( right? ) – Royi Namir Aug 02 '12 at 14:20
  • 1
    The nr of pulses needed will depend on the speed difference between Producer and Consumer. In this case the Consumers are slow and you only need 2 Pulses, correct. – H H Aug 02 '12 at 14:23