0

I'm trying to write a windows service whose producers and consumers work like this:

  • Producer: At scheduled times, get all unprocessed items (Processed = 0 on their row in the db) and add each one to the work queue that isn't already in the work queue
  • Consumer: Constantly pull items from the work queue and process them and update the db (Processed = 1 on their row)

I've tried to look for examples of this exact data flow in C#.NET so I can leverage the existing libraries. But so far I haven't found exactly that.

I see on https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html the example

private static void Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        queue.Post(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}

Here's the "idea" of what I'm trying to modify that to do:

while(true)
{
    if(await WorkQueue.OutputAvailableAsync())
    {
        ProcessItem(await WorkQueue.ReceiveAsync());
    }
    else
    {
        await Task.Delay(5000);
    }
}

...would be how the Consumer works, and

MyTimer.Elapsed += Produce;

static async void Produce(object source, ElapsedEventArgs e)
{
     IEnumerable<Item> items = GetUnprocessedItemsFromDb();
     foreach(var item in items)
         if(!WorkQueue.Contains(w => w.Id == item.Id))
             WorkQueue.Enqueue(item);  
}

...would be how the Producer works.

That's a rough idea of what I'm trying to do. Can any of you show me the right way to do it, or link me to the proper documentation for solving this type of problem?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
user7127000
  • 3,143
  • 6
  • 24
  • 41
  • Whats not working? If you need a code review then post your question on the code review site. – CodingYoshi Mar 17 '17 at 15:36
  • 1
    Your description includes a race condition. It is possible that the consumer can be processing data from the work queue while the producer is writing the same data to the work queue. You specify that the producer only places items in the work queue that are not currently in the work queue. That does not prevent the processing of duplicate items. – Jim Rogers Mar 17 '17 at 16:00
  • Right. The restriction of "only enqueueing items that aren't already in the queue" is very odd. Have you considered using a persistent queue and using `Processed = 1` for "in queue" and `Processed = 2` for "processed"? – Stephen Cleary Mar 17 '17 at 16:56
  • @StephenCleary I guess I would only dequeue an item if it has been processed. That way I know that if it's in the queue it is not currently being processed not has it been processed. Something like that. – user7127000 Mar 18 '17 at 02:35

1 Answers1

0

Creating a custom BufferBlock<T> that rejects duplicate messages is anything but trivial. The TPL Dataflow components do not expose their internal state for the purpose of customization. You can see here an attempt to circumvent this limitation, by creating a custom ActionBlock<T> with an exposed IEnumerable<T> InputQueue property. The code is lengthy and obscure, and creating a custom BufferUniqueBlock<T> might need double the amount of code, because this class implements the ISourceBlock<T> interface too.

My suggestion is to find some other way to avoid processing twice an Item, instead of preventing duplicates from entering the queue. Maybe you could add the responsibility to the Consumer to query the database, and check if the currently received item is unprocessed, before actually processing it.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104