I'm experimenting with TPL Dataflow before porting it into my production code. The production code is a classical producer/consumer system - producer(s) produce messages (related to financial domain), consumers process those messages.
What I'm interested in, is how stable environment will stay if at some point producer(s) produce much more faster than consumers can handle it (will system blow up, or what will happen) & more importantly what to do in those cases.
So in attempt to have similar simple application I come up with following.
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Item
is a very simple class
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
}
GenerateItem
simply news up Item
static Item GenerateItem()
{
return new Item(Guid.NewGuid().ToString());
}
Now, to imitate not so fast consumer - I made ProcessItem
to hold for 100ms
.
static async Task ProcessItem(Item item)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #{item.ItemId} item.");
}
Executing this results in OOM exception in 20 or so seconds.
Then I went on and added more consumers (more ActionBlocks up to 10), which wins some more time, but eventually results in the same OOM exception.
I also noticed that GC is under huge pressure (VS 2015 Diagnostics tool shows GC is running almost all the time), so I introduced object pooling (very simple one, essentially it is ConcurrentBag
storing items) for Item
, but still I'm hitting the same wall (OOM exception is thrown).
To give some details on what is in memory, why it is running out of it.
- Biggest size have objects of type
SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>
&ConcurrentQueue+Segment<TplDataFlow.Item>
- I see that
BufferBlock
's InputBuffer is full ofItem
s (Count=14,562,296) - Since I setup
BoundedCapacity
forActionBlock
(s), their input buffer is also close to the configurated number (InputCount=99,996)
To make sure that slower producer would make it possible for consumers to keep up, I made producer to sleep between iterations :
for (int i = 0; i < int.MaxValue; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
}
And it works fine - no exception is thrown, memory usage is constantly low, I don't see any GC pressure anymore.
So I have few questions
- Am I doing anything inherently wrong while trying to reproduce very fast producer/slow consumer(s) scenario with TPL Dataflow building blocks
- Is there any way to make this work and not fail with OOM exception.
- Any comments/links on best practices how to handle this kind of scenarios (very fast producer/slow consumers) within TPL Dataflow context.
- My understanding of the problem is - since consumers can't keep up,
BufferBlock
's internal buffer is getting filled with messages very fast, and holds off to messages until some of the consumers come back to ask for next message as a result application runs out of memory (due to filled up internal buffer ofBufferBlock
) - would you agree with this ?
I'm using Microsoft.Tpl.Dataflow
package -version 4.5.24.
.NET 4.5 (C# 6). Process is 32 bit.