Questions tagged [tpl-dataflow]

TPL Dataflow (TDF) is a .NET library for building concurrent applications. It promotes actor/agent-oriented designs through primitives for in-process message passing, dataflow, and pipelining. TDF builds upon the TPL (Task Parallel Library) in .NET 4 and integrates with async language support in C#, Visual Basic, and F#. TDF lacks join/merge by key (like SSIS) and time-based windowing (available in Rx).

Platform: .NET Framework 4.0 / .NET Core / .NET 6

Promotes: actor/agent programming model

Strengths: limiting parallelism (for example reading from disk bound vs. CPU compute bound tasks would be limited differently), data flow via message passing, integration with other .NET libraries like Rx and TPL.

Weaknesses: no data flow merge by key (like SSIS merge operation), and only the most trivial of time windowing (compare BroadcastBlock with Rx BufferWithTime() or CEP/StreamInsight windowing operators)

629 questions
67
votes
3 answers

Throttling asynchronous tasks

I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time. Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a…
Josh Wyant
  • 1,177
  • 1
  • 8
  • 13
64
votes
2 answers

TPL Dataflow, whats the functional difference between Post() and SendAsync()?

I am confused about the difference between sending items through Post() or SendAsync(). My understanding is that in all cases once an item reached the input buffer of a data block, control is returned to the calling context, correct? Then why would…
Matt
  • 7,004
  • 11
  • 71
  • 117
54
votes
2 answers

Implementing correct completion of a retryable block

Teaser: guys, this question is not about how to implement retry policy. It's about correct completion of a TPL Dataflow block. This question is mostly a continuation of my previous question Retry policy within ITargetBlock. The answer to this…
Oleks
  • 31,955
  • 11
  • 77
  • 132
34
votes
5 answers

TPL Dataflow, guarantee completion only when ALL source data blocks completed

How can I re-write the code that the code completes when BOTH transformblocks completed? I thought completion means that it is marked complete AND the " out queue" is empty? public Test() { broadCastBlock = new BroadcastBlock(i => …
Matt
  • 7,004
  • 11
  • 71
  • 117
32
votes
1 answer

Duplicate exceptions with BroadcastBlock in TPL Dataflow

I am attempting to use TPL Dataflow to create a pipeline. All is working fine so far, with my pipeline defined as follows (although my issue is just with broadcaster, submissionSucceeded, submissionFailed): // Define tasks var…
bornfromanegg
  • 2,826
  • 5
  • 24
  • 40
26
votes
3 answers

Benefits of using BufferBlock in dataflow networks

I was wondering if there are benefits associated with using a BufferBlock linked to one or many ActionBlocks, other than throttling (using BoundedCapacity), instead of just posting directly to ActionBlock(s) (as long as throttling is not required).
Dimitri
  • 6,923
  • 4
  • 35
  • 49
25
votes
1 answer

What is difference between System.Threading.Tasks.Dataflow and Microsoft.Tpl.Dataflow

There are 2 different official TPL Dataflow nuget package. I am confused to choose which one i should to use. As far as i understand System.Threading.Tasks.Dataflow version is tiny bit newer than other and it seems System.Threading.Tasks.Dataflow…
Freshblood
  • 6,285
  • 10
  • 59
  • 96
24
votes
3 answers

Skip Item in Dataflow TransformBlock

TPL Dataflow provides a TransformBlock for transforming input, e.g.: var tb = new TransformBlock(i => i * 2); Is it possible to not output some of the input, e.g. if the input fails some validation test? var tb = new…
Eric J.
  • 147,927
  • 63
  • 340
  • 553
23
votes
2 answers

BufferBlock deadlock with OutputAvailableAsync after TryReceiveAll

While working on an answer to this question, I wrote this snippet: var buffer = new BufferBlock(); var producer = Task.Run(async () => { while (true) { await Task.Delay(TimeSpan.FromMilliseconds(100)); …
i3arnon
  • 113,022
  • 33
  • 324
  • 344
22
votes
3 answers

I/O performance - async vs TPL vs Dataflow vs RX

I have a piece of C# 5.0 code that generates a ton of network and disk I/O. I need to run multiple copies of this code in parallel. Which of the following technologies is likely to give me the best performance: async methods with await directly use…
22
votes
2 answers

TPL Dataflow, how to forward items to only one specific target block among many linked target blocks?

I am looking for a TPL data flow block solution which can hold more than a single item, which can link to multiple target blocks, but which has the ability to forward an item to only a specific target block that passes a filter/predicate. At no time…
Matt
  • 7,004
  • 11
  • 71
  • 117
21
votes
5 answers

How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize?

Using Dataflow CTP (in the TPL) Is there a way to call BatchBlock.TriggerBatch automatically if the number of currently queued or postponed items is less than the BatchSize, after a timeout? And better: this timeout should be reset to 0 each time…
Softlion
  • 12,281
  • 11
  • 58
  • 88
21
votes
1 answer

How can I make sure a dataflow block only creates threads on a on-demand basis?

I've written a small pipeline using the TPL Dataflow API which receives data from multiple threads and performs handling on them. Setup 1 When I configure it to use MaxDegreeOfParallelism = Environment.ProcessorCount (comes to 8 in my case) for…
Jeroen Vannevel
  • 43,651
  • 22
  • 107
  • 170
21
votes
1 answer

What are the differences between TPL Dataflow(TDF) and Reactive Extensions?

After days of googleing I think I can't decide which one is for what scenario. Of course I would like to use a perfect framework which combines both (unrealistic of course). I even know that it's possible to use them together. But the real question…
naeron84
  • 2,955
  • 3
  • 30
  • 37
17
votes
3 answers

Using async/await and yield return with TPL Dataflow

I am trying to implement a data processing pipeline using TPL Dataflow. However, I am relatively new to dataflow and not completely sure how to use it properly for the problem I am trying to solve. Problem: I am trying to iterate through the list of…
Tejas Vora
  • 538
  • 9
  • 19
1
2 3
41 42