1

My BufferBlock from Dataflow library seems to miss values when the period between sending the message is low. Here is the code:

        private async static Task Main(string[] args)
        {
            await foreach (var x in Exec())
            {
                Console.WriteLine(x);
            }
        }

        public static async IAsyncEnumerable<int> Exec()
        {
            BufferBlock<int> buffer = new BufferBlock<int>();
            const int commandTime = 10;
            var tasks = Enumerable
                .Range(0, 10)
                .Select(e =>
                    Task.Run(async () =>
                    {
                        var x = new Random().Next(0, commandTime);
                        await Task.Delay(x);
                        while (!await buffer.SendAsync(x)) ;
                    }));

            var t = Task.WhenAll(tasks);

            while (!t.IsCompleted)
            {
                yield return await buffer.ReceiveAsync();
            }
        }

The await Task.Delay(x) is representing a call to external service. When I set commandTime to 10 I will get only one result (sometimes more) but when I extend possible execution time of a command (e.g. 1000) then I get all 10. Can someone explain me why the BufferBlock is not consuming values?

Pawel
  • 525
  • 3
  • 16
  • Have you tried to link the buffer block to an action block that does the WriteLine? – Fildor Aug 16 '21 at 10:09
  • I am not 100% sure, but I am suspicious that your while condition is the culprit. – Fildor Aug 16 '21 at 10:14
  • @Fildor without a `while` the result is the same. I did it because of the description of the method – Pawel Aug 16 '21 at 10:57
  • @Fildor and the main question raises... why there is a correlation between the time of the service response and sending it to the buffer. – Pawel Aug 16 '21 at 11:05
  • Somewhat related: [How to implement an efficient WhenEach that streams an IAsyncEnumerable of task results?](https://stackoverflow.com/questions/58194212/how-to-implement-an-efficient-wheneach-that-streams-an-iasyncenumerable-of-task) – Theodor Zoulias Aug 16 '21 at 11:13

2 Answers2

2

SendAsync and ReceiveAsync themselves wait for results (or a result can be sent). No loops are necessary.

I have added some output to your code to make this behavior clearer. For a better demonstration the buffer block can hold 3 elements.

public static class Program
{
    public static async Task Main()
    {
        await foreach (var x in Exec())
        {
            Console.WriteLine(x);
        }
        Console.WriteLine("READY");
    }

    public static async IAsyncEnumerable<int> Exec()
    {
        // We define a buffer block that can hold 3 elements. The fourth element will have to wait.
        BufferBlock<int> buffer = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 3 });
        const int taskCount = 10;

        var tasks = Enumerable
            .Range(0, 10)
            .Select((e, number) =>
                Task.Run(async () =>
                {
                    Console.WriteLine($"Sending {number}...");
                    // From documentation: If the target
                    // postpones the offered element, the element will be buffered until such time that
                    // the target consumes or releases it, at which point the task will complete, with
                    // its System.Threading.Tasks.Task`1.Result indicating whether the message was consumed.
                    // So we don't need a waiting loop.
                    var result = await buffer.SendAsync(number);
                    Console.WriteLine($"Result of SendAsync({number}) is {result}.");
                }))
            // Now our LINQ statement will be executed and 10 Tasks will write into the buffer.
            .ToList();

        // Our writing tasks are running and we receive the results as soon as they are available.
        Console.WriteLine("Waiting for receive.");
        for (int i = 0; i < taskCount; i++)
        {
            // To demonstrate that SendAsync will wait.
            await Task.Delay(2000);
            yield return await buffer.ReceiveAsync();
        }
    }
}

Signalling completion

The receive loop knows the number of messages. If the receiver does not know this, the sender can signal completion to the BufferBlock:

public static async IAsyncEnumerable<int> Exec()
{
    BufferBlock<int> buffer = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 3 });
    const int taskCount = 10;
    int sentNumbers = 0;
    var tasks = Enumerable
        .Range(0, taskCount)
        .Select((e, number) =>
            Task.Run(async () =>
            {
                var result = await buffer.SendAsync(number);
                // We have sent the last number. Now we signal to the BufferBlock that it should
                // not accept nor produce any more messages nor consume any more postponed messages.
                if (Interlocked.Add(ref sentNumbers, 1) == taskCount)
                {
                    buffer.Complete();
                }
            }))
        .ToList();

    while (await buffer.OutputAvailableAsync())
    {
        yield return await buffer.ReceiveAsync();
    }
}
Michael
  • 1,166
  • 5
  • 4
  • Hi Micheal, this `await Task.Delay(2000)` is placed in a correct place in my code, because I expect the method to return some Value that I will place in the buffer – Pawel Aug 16 '21 at 10:59
  • 1
    You can write a delay in your send task (or wherever you want), the program will still work. – Michael Aug 16 '21 at 11:04
  • Very nice solution :) I like it!!! That also solves my problem in code. What about the initial question regarding correlation between service response time and the missing values (in my code)? – Pawel Aug 16 '21 at 11:19
  • There was an infinite loop In your original code (SendAsync always returns true if no completion was sent to the buffer). The tasks run forever and send data to the buffer at very high speed. This can cause all kinds of problems. – Michael Aug 16 '21 at 13:04
  • Yeap but still it was always returning the answer but wrong :) The code also completes every time – Pawel Aug 16 '21 at 13:05
  • 1
    `while (!t.IsCompleted)` is the main reason (my mistake, `while (!await buffer.SendAsync(x))` is not an infinite loop because you negate the condition). `t` is the waiting task (the return value of `WhenAll`. If your tasks are fast all sending tasks completed before this statement. So the loop is executed one time. If your sending tasks take more time, you will catch more (but not all) in your loop. – Michael Aug 16 '21 at 13:20
2

There are several problems with your code:

  1. The while (!await buffer.SendAsync(x)) loop is asking for trouble. If the SendAsync fails once to send the message, it will fail forever. The SendAsync fails to send a message only if the target block responds to the OfferMessage with DecliningPermanently. This happens when the target block has completed, either manually or as a result of an exception, and it doesn't accept any more messages. In your case the SendAsync will always succeed, so it's not a problem, but in other cases it could throw your code into an infinite loop pretty easily.
  2. Creating multiple Random instances in quick succession may result to all instances being initialized with the same seed, and so producing the same random sequence of numbers. AFAIK this is a problem for .NET Framework, not for the .NET Core / .NET 5. IMHO it is safer if you use a single Random instance, and synchronize the access to it (because the Random class is not thread-safe).
  3. The while (!t.IsCompleted) loop introduces a race condition. The task may be completed at a moment that the buffer still contains messages. You could try fixing it like this: while (!t.IsCompleted && buffer.Count > 0), but this would just exchange one race condition for another. These properties are not intended for controlling the execution flow. The correct way is to use a signaling mechanism, and specifically the OutputAvailableAsync method, as shown here.
  4. Creating 10 tasks manually and awaiting them with await Task.WhenAll defeats the purpose of using the TPL Dataflow in the first place. This library includes powerful components that can do the same thing easier, with more options, and with better behavior in case of exceptions. Like the TransformBlock for example. Below is how I would refactor your code, to take advantage of the power of this component:
public static async IAsyncEnumerable<int> Exec()
{
    const int commandTime = 10;
    var random = new Random();

    var block = new TransformBlock<object, int>(async _ =>
    {
        int x; lock (random) x = random.Next(0, commandTime);
        await Task.Delay(x);
        return x;
    }, new ExecutionDataflowBlockOptions()
    {
        EnsureOrdered = false,
        MaxDegreeOfParallelism = 5, // Optional
    });

    // Feeder
    _ = Task.Run(async () =>
    {
        try
        {
            foreach (var _ in Enumerable.Range(0, 10))
            {
                bool accepted = await block.SendAsync(null);
                if (!accepted) break; // The block has failed
            }
            block.Complete();
        }
        catch (Exception ex)
        {
            ((IDataflowBlock)block).Fault(ex);
        }
    });

    // Emit the results as they become available
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var item))
        {
            yield return item;
        }
    }
    await block.Completion; // Propagate possible exception
}

The input messages that are sent to the TransformBlock are irrelevant in this case, so I declared the TInput as object, and I passed nulls as messages.

The // Feeder task demonstrates how to feed the TransformBlock with messages in a separate asynchronous workflow, that doesn't interfere with the results-producing loop. It is not really nessesary for this specific example, where a simple foreach (var _ in Enumerable.Range(0, 10)) block.Post(null); would suffice.

Instead of a fire-and-forget task _ = Task.Run, you could also implement the feeder as an async void method. In practice it will make no difference, but in theory the async void is the more responsible option, because it will propagate any unhandled exception instead of swallowing it.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks for the response. The race condition thing that you pointed out directed me to the solution. I had missing check for the buffer count. After adding this check (`while(!t.IsCompleted || buffer.Count !=0)`) everything works as expected. Furthermore I will add a transformer that you posted. Thanks a lot for the solution – Pawel Aug 16 '21 at 15:11
  • One major obstacle that stops me from using most of your code is that the count of the tasks are not known beforehand in my code. They are provided from other service so the point of `Complete`ing the buffer is unknown. – Pawel Aug 16 '21 at 15:13
  • @Pawel if the count of the tasks is not known beforehand, then how do you know the `taskCount` in your example? I think that the `Task.WhenAll` makes it more difficult to work with an unknown number of tasks, than using an `ActionBlock` or a `TransformBlock`. – Theodor Zoulias Aug 16 '21 at 16:17
  • I just rewrote my production code to meet simplicity of example. Hmm and about the complexity which `WhenAll` introduces, I think that the benefit of return the values as fast as they arrive is worth introducing it. Or maybe I just haven't come up with the better idea :) – Pawel Aug 17 '21 at 07:50
  • I needed to return all elements and as soon as they arrive. If I would not use that I might end up with missing values or low perfomant solution that needs to wait for all elements to come – Pawel Aug 17 '21 at 07:53
  • 1
    @Pawel the `TransformBlock` also propagates the results as soon as they are produced, provided that it is configured with `EnsureOrdered = false`, as shown in my example. I added it in the [second revision](https://stackoverflow.com/posts/68802616/revisions) of my answer, so you may have missed it. – Theodor Zoulias Aug 17 '21 at 08:35
  • 1
    Agree. What about the time of completing the buffer? Is that achievable in your opinion? (when number of tasks is not known) – Pawel Aug 17 '21 at 08:59
  • @Pawel now I see what you mean. In my example feeding the `TransformBlock` with messages must complete before entering the `while` loop that emits the results. I'll update my answer with an improved example soon. – Theodor Zoulias Aug 17 '21 at 09:16