I have a producer sending data to a BufferBlock
and when all data has been read from the source, it calls Complete()
.
The default behaviour is that when the completion is called, even if the buffer still has messages, it propagates the completion down the pipeline.
Is there a wait to tell a block: Propagate the completion only once your buffer is empty?
When the completion occurs, I get an exception on Receive
: InvalidOperationException: 'The source completed without providing data to receive.'
I am currently using:
var bufferBlock = new BufferBlock<string>();
var transformBlock = new TransformBlock<string, string>(s =>
{
Thread.Sleep(50);
return s;
});
bufferBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var i in Enumerable.Range(0, 10))
bufferBlock.Post(i.ToString());
bufferBlock.Complete();
while (!transformBlock.Completion.IsCompleted)
Console.WriteLine(transformBlock.Receive());
To avoid it I am currently using:
while (bufferBlock.Count > 0)
await Task.Delay(100);
bufferBlock.Complete();
which does not sound like a really clean solution.
Is it a race condition? I.E. The block flagging as not completed and them completing while I call receive?
I guess I could replace !transformBlock.Completion.IsCompleted
with block.OutputAvailableAsync
is that right?