0

I thought it's very basic approach but I haven't found any example yet. I have one producer and one consumer and I want to finish the pipeline when at least x objects were processed. Additionally I need to know what objects have been received.

That's how I do it:

public class BlockTester
{
    private static TransformBlock<int, int> _worker;

    public static async Task StartAsync()
    {
        _worker = new TransformBlock<int, int>(s => s + s);
        var buffer = new BufferBlock<int>();
        var consumeTask = Consume(buffer);

        _worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});

        foreach (var value in Enumerable.Range(0,100))
        {
            _worker.Post(value);
        }

        _worker.Complete();

        await buffer.Completion;

        if(buffer.TryReceiveAll(out var received))
        {
            Console.WriteLine(string.Join(", ", received));
        }
    }

    public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
    {
        var received = new List<int>();

        while (await buffer.OutputAvailableAsync())
        {
            var current = buffer.Receive();

            received.Add(current);

            if (current > 25)
            {
                _worker.Complete();
            }
        }

        return received;
    }
}

I am a bit confused about the buffer.TryReceiveAll. What's the difference between awaiting the consume task and the TryReceiveAll? Why does TryReceiveAll is false in my scenario? I guess there's still something wrong with my approach to reach my goals.

JSteward
  • 6,833
  • 2
  • 21
  • 30
BlackMatrix
  • 474
  • 5
  • 19

1 Answers1

2

Your Consume method should be an ActionBlock. There's no need to use OutputAvailableAsync or TryRecieveAll. Replace the BufferBlock with an ActionBlock and do your processing within the ActionBlock. It's not clear why you would need the TransformBlock either unless you have more than one step in the process.

public class BlockTester
{
    //Could be removed
    private static TransformBlock<int, int> _worker;

    public static async Task StartAsync()
    {
        //Could be removed
        _worker = new TransformBlock<int, int>(s => s + s);
        var processor = new ActionBlock<int>(x => ProcessMessage(x));

        _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });

        foreach (var value in Enumerable.Range(0, 100))
        {
            _worker.Post(value);
        }

        //_worker.Complete();

        await processor.Completion;
    }


    private static int itemsRecieved = 0;
    public static void ProcessMessage(int x)
    {
        Interlocked.Increment(ref itemsRecieved);
        if (itemsRecieved > 25) _worker.Complete();
        //process the message
        //log the message etc.
    }
}

Or with a complex message object:

public class Message { }

public class BlockTester
{
    //Could be removed
    private static TransformBlock<Message, Message> _worker;

    public static async Task StartAsync()
    {
        //Could be removed
        _worker = new TransformBlock<Message, Message>(s => s);
        var processor = new ActionBlock<Message>(x => ProcessMessage(x));

        _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });

        foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
        {
            _worker.Post(value);
        }

        //_worker.Complete();

        await processor.Completion;
    }


    private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
    public static void ProcessMessage(Message x)
    {
        itemsRecieved.Add(x);
        if (itemsRecieved.Count > 25) _worker.Complete();
        //process the message
        //log the message etc.
    }
}

Edit To answer the original question:

Why does TryReceiveAll return false:

Because by the time TryReceiveAll is ran the BufferBlock has "completed". For a block to be completed it must contain 0 items in its output buffer. The Consume method was pulling all the items out before the block was allowed to complete and you'd finally call TryRecieveAll on an empty block.

JSteward
  • 6,833
  • 2
  • 21
  • 30
  • Yes the example is simplified. I have an I/O Task in the TransformBlock and the Consume-Block is a "do a summary block". Your approach with Interlocked.Increment maybe works for primitive objects but I want to collect reference objects and print them at the end of the pipeline. – BlackMatrix Mar 08 '18 at 21:56
  • You can collect objects in a `ConcurrentBag` if you need access from multiple threads but with default block options you could use a simple list. – JSteward Mar 08 '18 at 21:58
  • I got some more thoughts. So that's the way to go? Adding an ActionBlock which "posts" the items to a ConcurrencyBag? Edit: Haha, same thoughts ;) – BlackMatrix Mar 08 '18 at 21:59
  • It's a reasonable option for your requirements. But I have limited view of what you really trying to accomplish. That being said dataflow will do a lot for you out of the box when you have a simple pipeline structure, that's where it excels. – JSteward Mar 08 '18 at 22:02
  • I think I'll use your approach. Could you update the ´Interlocked.Increment(ref itemsRecieved);´ to an ConcurrentBag to collect the objects, then I'll accept it as answer. – BlackMatrix Mar 08 '18 at 22:05
  • @BlackMatrix Done – JSteward Mar 08 '18 at 22:10
  • JSteward Thanks Could you also explain why TryReceiveAll is false in my initial case? – BlackMatrix Mar 08 '18 at 22:10
  • Because by the time you run that code the `BufferBlock` has "completed". For a block to be completed it must contain 0 items in its output buffer. Your `Consume` method was pulling all the items out before the block was allowed to complete and you'd finally call `TryRecieveAll` on an empty block. (Added explanation to main answer for readers) – JSteward Mar 08 '18 at 22:22
  • ProcessMessage contains at least one race condition, since adding to items received and getting the count are not atomic. – cubesnyc Jul 09 '20 at 20:06