2

I've read a few other similiar-but-not-the-same links trying to find some answers: How to consume a BlockingCollection<T> in batches

However, (in the above link) not using GetConsumingEnumerable seems fishy.

What is the correct method to effectively block producers while the consumer (should be singular) empties the collection?

[We want to do batch-processing because each batch does a web service call which would be a bottle neck if every single message/item needed its own call. Batching the messages/items is the solution for this bottleneck.]

Ideally:

1) Receive message

2) New producer task to push into collection

3) When collection 'full' (arbitrary limit), block all producer(s), new consumer task to consume ALL of the collection, then unblock for producer(s).

In other words; I want (parallel producers) xor (single consumer) acting on the collection at any time.

Seems like this should have been done before, but I can't seem to find a code snippet that specifically acts this way.

Thanks for any help.

Community
  • 1
  • 1
Guy
  • 139
  • 2
  • 2
  • 7
  • It sounds as if you want to run producers and consumer in serial and not parallel. If you don't mind if producers are running while consumer is processing, then double-buffer would be better than single collection. – Euphoric Apr 06 '15 at 20:17
  • After the queue becomes full, you normally wouldn't wait for the queue to be completely empty before allowing the producers to start populating it again - why do you want to do that? – Matthew Watson Apr 06 '15 at 20:17
  • "Normally wouldn't wait" = exactly why I posted this. I have a requirement (to avoid data loss if the program dies) to process batches of N messages or every M milliseconds. If messages are coming in very quickly and producing into the same BlockingCollection, we check each time if the collection has N items in it. Once this is true, we spin off (what should be a single) consumer task to batch process them all. However, if the consumer grabs ONE item via foreach loop, then another message/producer comes in, it will hit the limit and spawn another consumer task. Trying to avoid this. – Guy Apr 06 '15 at 20:35

2 Answers2

0

Using this model all of the work is entirely serialized, which is to say you never have more than one "thing" working at a time. Either the producer is working, or the consumer is. Because of this, you don't really need a collection that is manipulated from both a producer and consumer, instead you can have a producer that produces batches of a traditional collection that the consumer consumes when it's done. It could look something like this:

public Task<List<Thing>> Produce(Message message)
{
    //...
}

public Task Consume(List<Thing> data)
{
    //...
}

public async Task MessageReceived(Message message)
{
    while(HaveMoreBatches(message))
    {
        await Consume(await Produce(message));
    }
}

This lets you produce a batch, then consume it, then produce another batch, then consume it, etc. until there are no more batches to produce.

Servy
  • 202,030
  • 26
  • 332
  • 449
  • That's not completely correct, according to the question there are multiple producers. – Ben Voigt Apr 06 '15 at 20:25
  • @BenVoigt But they only produce a single batch, meaning that the implementation of `Produce` would be multithreaded (or parallelized in some other way), if the OP wants it to be. – Servy Apr 06 '15 at 20:30
  • Agree with @Servy. We have 'messages' coming into the program very quickly, and each one spawns a producer task to handle the message and push it into the BlockingCollection (or so I think...). Basically I want parallel-producers serialized with one consumer. If possible. – Guy Apr 06 '15 at 20:39
0

According to your vague description, I believe double-buffer is what you want.

Simply create two buffers. Producers write into one until. When it gets full or when timer ticks out, it gets "swapped" for the second one and producers start writing into the new one. The consumer then starts reading the first, now full buffer.

This allows both producers and consumer to run at the same time. And makes sure consumer handles all previously create work in a batch before repeating the loop again.

Euphoric
  • 12,645
  • 1
  • 30
  • 44
  • I thought about something like this, but what happens if the second buffer gets 'filled' by the producers before the first buffer is emptied by the consumer? I still need a way to block the producers until the first buffer is ready/empty. This scenario applies for N-buffers as well. – Guy Apr 07 '15 at 13:33
  • @Guy Then you need to implement manual blocking mechanism, no collection is going to help you. But even then, if that happens, I believe you have bigger issue. What would happen if producers were blocked but messages kept coming? – Euphoric Apr 07 '15 at 13:35
  • Exactly what I'm trying to figure out. Message comes in --> Spawns task for producer --> SOMEHOW block until the queue/collection is ready to accept messages. This seems like something that should have been done before. – Guy Apr 07 '15 at 18:24
  • @Guy Usually, blocking ANYTHING is not a good idea. If you have to block anything for longer than inserting into a collection, then it means your design is wrong. In your case, you are assuming the consumer is bottleneck, which is root of your problem. You need to ensure the consumer is at least as fast as all the producers combined. – Euphoric Apr 07 '15 at 18:38
  • "for longer than inserting into a collection" -- We only want to block for long enough to remove all from the collection. Then the 'consumer' will push that data elsewhere. The problem is that we DON'T want multiple consumers removing data from the collection, because then each one will send data instead of a single 'batch' send. – Guy Apr 07 '15 at 18:55