1

There is a continuous data flow from a source;

I want to keep a certain amount of the most current data from that flow. If I use BufferBlock and BoundedCapacity = 200; It'll keep the first 200 data from flow, not the N latest 200. Basically, I want to build a FIFO buffer, The buffer will drop the previous data if there is new data coming and the buffer is full.

I know the behavior of the BroadCastBlock but it keeps just a single message. I would like to have the same behavior but keeping more messages instead of one. I want to store the last few items sent. I tried to use BroadCastBlock with DataflowBlockOptions() { BoundedCapacity = 250 } but it didn't work.

TPLBlockOptions = new DataflowBlockOptions() { BoundedCapacity = 200 };
TPLBlock = new BroadcastBlock<Data>(z => z, TPLBlockOptions);

while(IsDataFlowActive)
{
    await TPLBlock.SendAsync(rawData);
}

For the consumer part, I don't know when to use the data. But the data in the Buffer must be up to date.

Can I do that with the TPL library? I saw and read this post and its answers. If this is a duplicate of it, I'll delete this post. Any advice would be appreciated.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Salih Karagoz
  • 2,189
  • 2
  • 22
  • 35
  • A simple structure with a `lock` and a queue with an awaitable send. When a message comes in, it adds to the queue, dequeues if it needed to maintain a certain length with a send that can be awaited which would just pop from the queue. You could wrap this in the interface for a custom block, but its not needed. Also Rx has some tools that would achieve this fairly easily and would work well with Dataflow – TheGeneral Aug 19 '21 at 22:08
  • Yeap, I think it's a duplicate of the [TPL dataflow process N latest messages](https://stackoverflow.com/questions/44873603/), unless you want this functionality in a special `BufferBlock` instead of a processing block like an `ActionBlock`. But then you'll have to link this special `BufferBlock` to some other processing block with its own input queue, and the behavior of the whole system will become a bit ambiguous. Essentially an influx of new messages will cause all the old messages in the special `BufferBlock` to be dropped, while the old messages in the processing block will be preserved. – Theodor Zoulias Aug 19 '21 at 22:09
  • @TheGeneral are you referring to the Rx [`Replay`](https://stackoverflow.com/questions/68813914/make-system-reactive-repeat-last-x-items-on-new-subscribe) operator? AFAIK the interoperability between TPL Dataflow and Rx is an one way road. You can get an `IObservable` from a dataflow block (by using the [`AsObservable`](https://stackoverflow.com/questions/44579543/using-asobservable-to-observe-tpl-dataflow-blocks-without-consuming-messages) extension method), but you can't get a dataflow block from an `IObservable`. – Theodor Zoulias Aug 19 '21 at 22:24
  • Another somewhat related question: [BlockingCollection Max Size](https://stackoverflow.com/questions/15928642/blockingcollection-max-size). Using a special `BlockingCollection` that stores only the N latest items in a TPL Dataflow solution should be doable, but still quite awkward. – Theodor Zoulias Aug 19 '21 at 22:31
  • @TheodorZoulias in regards to RX, nah not really referring to replay, however I would have to defer to an RX expert. Also I like your answer. It would be a fun question to answer in any sense, unfortunately I am too busy :/ and also its a duplicate – TheGeneral Aug 19 '21 at 23:09
  • @TheGeneral I am not thrilled with my answer honestly. Every time I have to write a custom TPL Dataflow block, it looks more complicated than it should be. The damn library is not easily extensible. – Theodor Zoulias Aug 20 '21 at 00:12
  • Not even Microsoft is able to extend the library easily. Adding support for `IAsyncEnumerable` to the `TransformManyBlock` [was deemed too compicated](https://github.com/dotnet/runtime/issues/30863#issuecomment-899982598), and was postponed for .NET 7. – Theodor Zoulias Aug 20 '21 at 00:19
  • @TheodorZoulias I know exactly what you mean, dataflow is definitely fun, but overly verbose in a lot of ways. But it still has a soft spot for me – TheGeneral Aug 20 '21 at 02:00

0 Answers0