4

I've an application that works with a queue with strings (which corresponds to different tasks that application needs to perform). At random moments the queue can be filled with strings (like several times a minute sometimes but it also can take a few hours.

Till now I always had a timer that checked every few seconds the queue whether there were items in the queue and removed them.

I think there must be a nicer solution than this way. Is there any way to get an event or so when an item is added to the queue?

abatishchev
  • 98,240
  • 88
  • 296
  • 433
Sophie
  • 193
  • 1
  • 7

3 Answers3

5

Yes. Take a look at TPL Dataflow, in particular, the BufferBlock<T>, which does more or less the same as BlockingCollection without the nasty side-effect of jamming up your threads by leveraging async/await.

So you can:

void Main()
{
    var b = new BufferBlock<string>();
    AddToBlockAsync(b);
    ReadFromBlockAsync(b);
}

public async Task AddToBlockAsync(BufferBlock<string> b)
{
    while (true)
    {
        b.Post("hello");
        await Task.Delay(1000);
    }
}

public async Task ReadFromBlockAsync(BufferBlock<string> b)
{
    await Task.Delay(10000); //let some messages buffer up...
    while(true)
    {
        var msg = await b.ReceiveAsync();
        Console.WriteLine(msg);
    }
}
spender
  • 117,338
  • 33
  • 229
  • 351
2

I'd take a look at BlockingCollection.GetConsumingEnumerable. The collection will be backed with a queue by default, and it is a nice way to automatically take values from the queue as they are added using a simple foreach loop.

There is also an overload that allows you to supply a CancellationToken meaning you can cleanly break out.

Daniel Kelley
  • 7,579
  • 6
  • 42
  • 50
1

Have you looked at BlockingCollection ? The GetConsumingEnumerable() method allows an indefinite loop to be run on the consumer, to which will new items will be yielded once an item becomes available, with no need for timers, or Thread.Sleep's:

// Common:

  BlockingCollection<string> _blockingCollection =
                     new BlockingCollection<string>();

// Producer

 for (var i = 0; i < 100; i++) 
 {
    _blockingCollection.Add(i.ToString());
    Thread.Sleep(500); // So you can track the consumer synchronization. Remove.
 }

// Consumer:

   foreach (var item in _blockingCollection.GetConsumingEnumerable())
   {
      Debug.WriteLine(item);
   }
StuartLC
  • 104,537
  • 17
  • 209
  • 285