10

When you create a batchblock with bounded capacity and call triggerBatch while (In parallel to) posting a new item - posting new item will fail during the trigger batch execution time.

Calling Trigger batch (every X time) is made in order to ensure that the data isn't delayed for too long in the block, in cases where the incoming data stream paused or slowed down.

The following code will output some "post failure" events. For example:

    public static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
        batchBlock.LinkTo(actionBlock);

        var producerTask = Task.Factory.StartNew(() =>
        {
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            {
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            }
        });

        var triggerBatchTask = Task.Factory.StartNew(() =>
            {                    
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            });

        producerTask.Wait();
        triggerBatchTask.Wait();
    }

    public static void ProcessBatch(int[] batch)
    {
        Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
    }

*Note that this scenario is reproducible only when the batchBlock is Bounded.

Am I missing something or is it an issue with batchBlock?

Al Yaros
  • 151
  • 3
  • Somewhat related: [How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize?](https://stackoverflow.com/questions/9419442/how-to-call-triggerbatch-automagically-after-a-timeout-if-the-number-of-queued-i) – Theodor Zoulias Jun 27 '20 at 20:40

1 Answers1

5

The BatchBlock does not really reject the item, it attempts to postpone it. Except that in the case of Post(), postponing is not an option. A simple way to fix this would be to use await batchBlock.SendAsync(i) instead of batchBlock.Post(i) (this also means you need to change Task.Factory.StartNew(() => to Task.Run(async () =>).

Why does this happen? According to the source code, if the BatchBlock is bounded, TriggerBatch() is processed asynchronously and while it's processed, no new items are being accepted.

In any case, you shouldn't expect that Post() will always return true on a bounded block, if the block is full, Post() will also return false.

svick
  • 236,525
  • 50
  • 385
  • 514
  • Meanwhile i'm using a different solution, by introducing another block that will accept the failures, and eventually i'm calling triggerbatch in a serial manner on both blocks. To your suggest solution - await and async will create a task to handle each incoming item, this might cause out of memory issues when you have a huge burst of events many tasks will be created unbounded. – Al Yaros Feb 26 '16 at 18:05
  • 1
    @AlYaros No, it won't. If the item is accepted, you get a cached `Task`, so no allocations there. And if the item is postponed, the code you showed won't add new items until it's accepted. If in your actual code `await` would cause issues, then IMO either you should be able to fix them, or you're going to get issues even without it. – svick Feb 26 '16 at 18:12
  • 1
    By the way, thanks for your comments :) I'm not sure that this is absolutely safe in terms of tasks memory consumptions.. I'll take a look on the source code you suggested and test it a bit. Isn't the task created before the block code is executed, regardless to the post result? – Al Yaros Feb 26 '16 at 18:24
  • I would also expect that the Post will be accepted, if the bounded capacity wasn't reached, regardless the TriggerBatch processing. – shlomiw Feb 28 '16 at 10:45
  • @svick from the source code `TriggerBatch` and `OfferMessage` method execute under `lock(IncomingLock)`. other than that what else make preventing `Post()` to fail when `TriggerBatch()` is executing. I ran some test and see that If batchTrigger timeout as 1000ms 'Post()' return false very less times compared to if timeout is some lower value like <100ms – Girish Jun 22 '22 at 07:29