There are several problems with your code:
- The
while (!await buffer.SendAsync(x))
loop is asking for trouble. If the SendAsync
fails once to send the message, it will fail forever. The SendAsync
fails to send a message only if the target block responds to the OfferMessage
with DecliningPermanently
. This happens when the target block has completed, either manually or as a result of an exception, and it doesn't accept any more messages. In your case the SendAsync
will always succeed, so it's not a problem, but in other cases it could throw your code into an infinite loop pretty easily.
- Creating multiple
Random
instances in quick succession may result to all instances being initialized with the same seed, and so producing the same random sequence of numbers. AFAIK this is a problem for .NET Framework, not for the .NET Core / .NET 5. IMHO it is safer if you use a single Random
instance, and synchronize the access to it (because the Random
class is not thread-safe).
- The
while (!t.IsCompleted)
loop introduces a race condition. The task may be completed at a moment that the buffer
still contains messages. You could try fixing it like this: while (!t.IsCompleted && buffer.Count > 0)
, but this would just exchange one race condition for another. These properties are not intended for controlling the execution flow. The correct way is to use a signaling mechanism, and specifically the OutputAvailableAsync
method, as shown here.
- Creating 10 tasks manually and awaiting them with
await Task.WhenAll
defeats the purpose of using the TPL Dataflow in the first place. This library includes powerful components that can do the same thing easier, with more options, and with better behavior in case of exceptions. Like the TransformBlock
for example. Below is how I would refactor your code, to take advantage of the power of this component:
public static async IAsyncEnumerable<int> Exec()
{
const int commandTime = 10;
var random = new Random();
var block = new TransformBlock<object, int>(async _ =>
{
int x; lock (random) x = random.Next(0, commandTime);
await Task.Delay(x);
return x;
}, new ExecutionDataflowBlockOptions()
{
EnsureOrdered = false,
MaxDegreeOfParallelism = 5, // Optional
});
// Feeder
_ = Task.Run(async () =>
{
try
{
foreach (var _ in Enumerable.Range(0, 10))
{
bool accepted = await block.SendAsync(null);
if (!accepted) break; // The block has failed
}
block.Complete();
}
catch (Exception ex)
{
((IDataflowBlock)block).Fault(ex);
}
});
// Emit the results as they become available
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var item))
{
yield return item;
}
}
await block.Completion; // Propagate possible exception
}
The input messages that are sent to the TransformBlock
are irrelevant in this case, so I declared the TInput
as object
, and I passed null
s as messages.
The // Feeder
task demonstrates how to feed the TransformBlock
with messages in a separate asynchronous workflow, that doesn't interfere with the results-producing loop. It is not really nessesary for this specific example, where a simple foreach (var _ in Enumerable.Range(0, 10)) block.Post(null);
would suffice.
Instead of a fire-and-forget task _ = Task.Run
, you could also implement the feeder as an async void
method. In practice it will make no difference, but in theory the async void
is the more responsible option, because it will propagate any unhandled exception instead of swallowing it.