2

I was hoping for a clean solution for throttling a specific type of producer while the consumer is busy processing, without writing a custom block of my own. I had hoped the code below would have done exactly that, but once SendAsync blocks after the capacity limit hits, its task never completes, insinuating that the postponed message is never consumed.

_block = new TransformBlock<int, string>(async i =>
{
    // Send the next request to own input queue
    // before processing this request, or block
    // while pipeline is full. 
    // Do not start processing if pipeline is full!
    await _block.SendAsync(i + 1);
    // Process this request and pass it on to the
    // next block in the pipeline.
    return i.ToString();
}, 
// TransformBlock block has input and output buffers. Limit both, 
// otherwise requests that cannot be passed on to the next 
// block in the pipeline will be cached in this block's output 
// buffer, never throttling this block.
new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });

// This block is linked to the output of the 
// transform block. 
var action = new ActionBlock<string>(async i =>
{
    // Do some very long processing on the transformed element.
    await Task.Delay(1000);
}, 
// Limit buffer size, and consequently throttle previous blocks 
// in the pipeline.
new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
_block.LinkTo(action);

// Start running.
_block.Post(0);

I was wondering if there is any reason why the linked ActionBlock does not consume the postponed message.

  • _a clean solution for throttling a specific type of producer_ Please elaborate more on the specific problem your trying to solve. Your current code cannot pass `i` onto to the `ActionBlock` if it cannot post a value back to itself, really the code seems unnecessary. What problem are you really trying to solve? – JSteward Aug 28 '17 at 22:35
  • 1
    You have an infinite loop in the code, how you think it can complete? – VMAtm Aug 29 '17 at 02:21
  • Apologies, it seems my question was not clear. The code sample is a simplification of the actual code. `TransformBlock` is my infinite producer. I do not expect it complete, ever. I expect it to keep creating messages to post to itself on the one hand, and to transform them to something else for its consumer (`ActionBlock`). I wanted to be able to throttle `TransformBlock`, in case its consumer is too slow, blocking it when it posts to itself and subsequently delaying message to the consumer. My problem with the code sample above, is that `SendAsync` gets **stuck** after N times. – S. Alexander Aug 29 '17 at 07:59
  • From `SendAsync` on MSDN: "If the target postpones the offered element, the element will be buffered until such time that the target consumes or releases it, at which point the task will complete, with its Result indicating whether the message was consumed. **If the target never attempts to consume or release the message, the returned task will never complete.**" – S. Alexander Aug 29 '17 at 08:07
  • A `TransformBlock` should not be an infinite producer. If you need something just to continuously send values into your pipeline you can use a simple loop with `SendAsync` in the body. That will allow the `BoundedCapacity` of your `ActionBlock` to throttle your producer. – JSteward Aug 29 '17 at 21:44
  • @JSteward, "should not be an infinite producer"? Why? How is it different from an `ActionBlock` as an infinite producer, as suggested here: https://stackoverflow.com/questions/13695499/proper-way-to-implement-a-never-ending-task-timers-vs-task#13712646 – S. Alexander Aug 30 '17 at 09:56
  • That `ActionBlock` invokes an action every 10sec and is not throttled. The problem your having with your code is caused by trying to `SendAsync` to the `TransformBlock` when its buffer is full. The `TransformBlock` in your example is doing nothing that indicates a `TransformBlock` is necessary, a basic loop will suffice and will be easily throttled. To utilize the answer you've referenced you would retain the `ActionBlock` as producer and `SendAsync` from there to your currently linked `ActionBlock` then post the value back to the producer. – JSteward Aug 30 '17 at 16:03
  • Fundamentally your *consumer* needs to be the source of throttling as only it can announce how much it can handle, by way of its `BoundedCapacity`. Right now you are trying to infer the capacity of the consumer by setting the `BoundedCapacity` on your *producer* which is logically flawed. – JSteward Aug 30 '17 at 16:05
  • @JSteward, thanks for your comments - I've updated my sample code to better illustrate what I am trying to achieve. The `TransformBlock` is needed in the real code because it actually does transform. The real code is a pipeline that consists of many blocks linked together. I did not want to infer the limits of the consumer on the producer, but as you can see from the code sample above - I have no choice, since the producer has an output buffer. – S. Alexander Aug 31 '17 at 19:03

1 Answers1

1

I faced the same problem as you. I didn't dig deep into implementation of LinkTo but I think it propogate message only when source block received some. I mean, there may be a case when source block have some messages in its input, but it will not process them until next Post/SendAsync it received. And that's your case.

Here is my solution and it's working for me.

First declare "engine"

/// <summary>
/// Engine-class (like a car engine) that produced a lot count (or infinite) of actions.
/// </summary>
public class Engine
{
    private BufferBlock<int> _bufferBlock;

    /// <summary>
    /// Creates source block that produced stub data.
    /// </summary>
    /// <param name="count">Count of actions. If count = 0 then it's infinite loop.</param>
    /// <param name="boundedCapacity">Bounded capacity (throttling).</param>
    /// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
    /// <returns>Source block that constantly produced 0-value.</returns>
    public ISourceBlock<int> CreateEngine(int count, int boundedCapacity, CancellationToken cancellationToken)
    {
        _bufferBlock = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

        Task.Run(async () =>
        {
            var counter = 0;
            while (count == 0 || counter < count)
            {
                await _bufferBlock.SendAsync(0);
                if (cancellationToken.IsCancellationRequested)
                    return;
                counter++;
            }
        }, cancellationToken).ContinueWith((task) =>
        {
            _bufferBlock.Complete();
        });

        return _bufferBlock;
    }
}

And then Producer that uses engine

/// <summary>
/// Producer that generates random byte blobs with specified size.
/// </summary>
public class Producer
{
    private static Random random = new Random();

    /// <summary>
    /// Returns source block that produced byte arrays. 
    /// </summary>
    /// <param name="blobSize">Size of byte arrays.</param>
    /// <param name="count">Total count of blobs (if 0 then infinite).</param>
    /// <param name="boundedCapacity">Bounded capacity (throttling).</param>
    /// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
    /// <returns>Source block.</returns>
    public static ISourceBlock<byte[]> BlobsSourceBlock(int blobSize, int count, int boundedCapacity, CancellationToken cancellationToken)
    {
        // Creating engine with specified bounded capacity.
        var engine = new Engine().CreateEngine(count, boundedCapacity, cancellationToken);

        // Creating transform block that uses our driver as a source.
        var block = new TransformBlock<int, byte[]>(
            // Useful work.
            i => CreateBlob(blobSize),
            new ExecutionDataflowBlockOptions
            {
                // Here you can specify your own throttling. 
                BoundedCapacity = boundedCapacity,
                MaxDegreeOfParallelism = Environment.ProcessorCount,
            });
        // Linking engine (and engine is already working at that time).
        engine.LinkTo(block, new DataflowLinkOptions { PropagateCompletion = true });
        return block;
    }

    /// <summary>
    /// Simple random byte[] generator.
    /// </summary>
    /// <param name="size">Array size.</param>
    /// <returns>byte[]</returns>
    private static byte[] CreateBlob(int size)
    {
        var buffer = new byte[size];
        random.NextBytes(buffer);
        return buffer;
    }
}

Now you can use producer with consumer (eg ActionBlock)

        var blobsProducer = BlobsProducer.CreateAndStartBlobsSourceBlock(0, 1024 * 1024, 10, cancellationTokenSource.Token);

        var md5Hash = MD5.Create();

        var actionBlock = new ActionBlock<byte[]>(b => 
        {
            Console.WriteLine(GetMd5Hash(md5Hash, b));
        },
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 10 });

        blobsProducer.LinkTo(actionBlock);

Hope it will help you!

xneg
  • 1,204
  • 15
  • 24