0

I have a producer sending data to a BufferBlock and when all data has been read from the source, it calls Complete().

The default behaviour is that when the completion is called, even if the buffer still has messages, it propagates the completion down the pipeline.

Is there a wait to tell a block: Propagate the completion only once your buffer is empty?

When the completion occurs, I get an exception on Receive: InvalidOperationException: 'The source completed without providing data to receive.'

I am currently using:

var bufferBlock = new BufferBlock<string>();

var transformBlock = new TransformBlock<string, string>(s =>
{
    Thread.Sleep(50);

    return s;
});

bufferBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var i in Enumerable.Range(0, 10))
    bufferBlock.Post(i.ToString());

bufferBlock.Complete();

while (!transformBlock.Completion.IsCompleted)
    Console.WriteLine(transformBlock.Receive());

To avoid it I am currently using:

while (bufferBlock.Count > 0)
    await Task.Delay(100);

bufferBlock.Complete();

which does not sound like a really clean solution.

Is it a race condition? I.E. The block flagging as not completed and them completing while I call receive?

I guess I could replace !transformBlock.Completion.IsCompleted with block.OutputAvailableAsync is that right?

  • So far I can't reproduce this behavior, could you provide a minimal example that illustrates this behavior? The only time when completion should be propagated is when the buffer empties out. – JSteward Jul 26 '18 at 15:51
  • It would probably be most helpful to see the code used to send data into the pipeline, I suspect that's where the error is, but can't be sure. – JSteward Jul 26 '18 at 15:56
  • Will write a console app to reproduce it and add code example here. It's simply a proucer sending X elements to the `BufferBlock` and then calling `Complete` while the other linked blocks are still processing. I get an exception stating the block was completed before the date could be processed. – DWorkAccount Jul 27 '18 at 13:21
  • @JSteward I figured out it was actually processing all the messages, it was just the message that was misleading. – DWorkAccount Jul 30 '18 at 08:33
  • To await completion await the Completion task of the last block. In this case, `await transformBlock.Completion`. – Panagiotis Kanavos Jul 30 '18 at 15:20

2 Answers2

1

To await completion of a pipeline, you should await the Completion task of the last block in the pipeline. In this case you should change your code to :

foreach (var i in Enumerable.Range(0, 10))
    bufferBlock.Post(i.ToString());

bufferBlock.Complete();

await transformBlock.Completion;

This is demonstraded in the Completing a pipeline and Waiting for the pipeline to finish paragraphs of Walkthrough: Creating a Dataflow Pipeline

TransformBlock has a buffer already which means anything posted to the input BufferBlock will be sent to the TransformBlock immediatelly. It would be better to use a different block for testing purposes. The walkthrough shows a nice example: One transformBlock to download page contents, another to parse them etc.

Just be careful of various .... unfortunate coding practices like creating a new HttpClient instance each time. The downloader could be changed to :

  var httpClient=new HttpClient();
  var downloadString = new TransformBlock<string, string>(async uri =>
  {
     Console.WriteLine("Downloading '{0}'...", uri);

     return await httpClient.GetStringAsync(uri);
  });
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
1

Yes, the correct way to retrieve messages from a block manually is by using the OutputAvailableAsync method, in combination with the TryReceive:

while (await transformBlock.OutputAvailableAsync())
{
    while (transformBlock.TryReceive(out var item))
    {
        Console.WriteLine(item);
    }
}
await transformBlock.Completion; // Required to propagate exceptions

The properties BufferBlock.Count, TransformBlock.OutputCount etc are only suitable for monitoring and statistics. Using them for controlling the flow of the data is in most cases an indication of possible race conditions and lurking bugs.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104