3

I apologize in advance for the title but it's the best I could think of to describe the action.

The requirement is processing requests of a message bus. Requests coming in maybe related to an id which correlates or groups these requests. The behavior I want is that for a stream of requests to process correlating ids synchronously. However different ids can be processed asynchronously.

I am using a concurrentdictionary to track the request being processed and the predicate in the linkto.

This is suppose to provide the synchronous processing of related requests.

However the behavior I get is that the first request gets processed and the second request gets dropped.

I have attached the sample code from a console application to simulate the issue.

Any direction or feedback will be appreciated.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            var requestTracker = new ConcurrentDictionary<string, string>();

            var bufferBlock = new BufferBlock<Request>();

            var actionBlock = new ActionBlock<Request>(x => 
            {
                Console.WriteLine("processing item {0}",x.Name);
                Thread.Sleep(5000);
                string itemOut = null;
                requestTracker.TryRemove(x.Id, out itemOut);
            });

            bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));


            var publisher = Task.Run(() =>
            {
                var request = new Request("item_1", "first item");
                bufferBlock.SendAsync(request);

                var request_1 = new Request("item_1", "second item");
                bufferBlock.SendAsync(request_1);

            });

            publisher.Wait();
            Console.ReadLine();
        }
    }

    public class Request
    {
        public Request(string id, string name)
        {
            this.Id = id;
            this.Name = name;
        }
        public string Id { get; set; }
        public string Name { get; set; }
    }
}
svick
  • 236,525
  • 50
  • 385
  • 514
rizan
  • 339
  • 1
  • 3
  • 16
  • You should have your exceptions propagate through the pipeline of your dataflow so you can see what went wrong. Take a look at MS's Complete Example at the end of this [MSDN Walkthrough](http://msdn.microsoft.com/en-us/library/hh228604(v=vs.110).aspx). You can then handle the AggregateException to find out what went wrong. – JNYRanger Jul 07 '14 at 20:54
  • 2
    Do you mean you want a group with the same id to be processed one after the other while groups can be processed concurrently? If so there is your answer: http://stackoverflow.com/q/21010024/885318 – i3arnon Jul 07 '14 at 22:43
  • @I3arnon - Your solution seems to be what I am looking for. I'm being a bit lazy here but perhaps do you have more detail. I am guessing the keys you get are dynamic i.e. so basically bursts of messages will have the same key and it's keeping changing. When an action block is busy delegated to process a message it updates a dictionary saying I am busy with this request and any subsequent request matching that key is delegated to that actionblock? Am I correct? – rizan Jul 08 '14 at 06:43
  • 1
    @I3arnon I think that's unnecessarily complicated here. – svick Jul 08 '14 at 13:53
  • @svick what would you suggest? – rizan Jul 08 '14 at 20:12
  • @I3arnon I followed your methodology with a few tweaks and it seems to working nicely - will post a sample. – rizan Jul 08 '14 at 20:13
  • @rizan The keys could be anything really, but in my case were the tcp session id, so I have x actionblocks working in parallel to utilize CPU and I make sure that items with the same id go to the same actionblock. If you're facing a slightly different problem, say how, and we'll figure it out. – i3arnon Jul 08 '14 at 21:46
  • @svick what "feature" would you remove? – i3arnon Jul 08 '14 at 21:46
  • @I3arnon The hashing. You could just have one block for each group (assuming that the fact that this is pretty much a memory leak doesn't matter). – svick Jul 08 '14 at 21:51
  • @svick assuming the groups are not predefined I wouldn't want to dynamically create a block per group, that's costly and could choke performance. That would be like setting MaxDegreeOfParallelism to infinity. – i3arnon Jul 08 '14 at 21:54

2 Answers2

2
  1. You say you want to process some requests in parallel (at least I assume that's what you meant by “asynchronously”), but ActionBlock is not parallel by default. To change that, set MaxDegreeOfParallelism.

  2. You're trying to use TryAdd() as the filter, but that won't work for two reasons:

    1. The filter is invoked only once, it's not automatically retried or anything like that. That means that if an item doesn't go through, it would never go through, even after the item that was blocking it was completed.
    2. If an item is stuck in the output queue of a block, no other items are going to get out of that block. This could significantly reduce the level of parallelism, even if you somehow worked around the previous issue.
  3. I think the simplest solution here would be to have a block for each group, that way, items from each group will be processed sequentially, but items from different groups will be processed in parallel. In code, it could look something like:

    var processingBlocks = new Dictionary<string, ActionBlock<Request>>();
    
    var splitterBlock = new ActionBlock<Request>(request =>
    {
        ActionBlock<Request> processingBlock;
    
        if (!processingBlocks.TryGetValue(request.Id, out processingBlock))
        {
            processingBlock = processingBlocks[request.Id] =
                new ActionBlock<Request>(r => /* process the request here */);
        }
    
        processingBlock.Post(request);
    });
    

    The issue with this approach is that the processing blocks for groups never go away. If you can't afford that (it's a memory leak), because you're going to have a large amount of groups, then the hashing approach suggested by I3arnon is the way to go.

Community
  • 1
  • 1
svick
  • 236,525
  • 50
  • 385
  • 514
  • Thanks for the help svick - actually I ended up using I3arnon approach. Much more robust and less error prone for what I was try to do. – rizan Jul 17 '14 at 13:54
1

I believe this is because your LinkTo() is not set up properly. By having a LinkTo() and passing a function as an argument you are adding in a condition. So this line:

bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id, x.Name));

Is essentially saying, pass data from the bufferBlock to the actionBlock IF you are able to add to your concurrent dictionary, which doesn't necessarily make sense (at least in your sample code)

Instead, you should link your bufferBlock to the actionblock without the lambda, since you don't need conditional linking in this situation (at least I don't think so based on your sample code).

Also, take a look at this SO question to see if you should be using SendAsync() or Post() since Post() can be easier to handle for simply adding data into the pipeline: TPL Dataflow, whats the functional difference between Post() and SendAsync()? . SendAsync will return a task, while Post will return true/false based on success entering the pipeline.

So to essentially find out what's going wrong you need to handle the continuations of your blocks. There is a nice tutorial over at MSDN in their TPL Dataflow introduction over here: Create a DataFlow Pipeline It would essentially look like this:

//link to section
bufferBlock.LinkTo(actionBlock);
//continuations
bufferBlock.Completion.ContinueWith(t =>
{
     if(t.IsFaulted)  ((IDataFlowBlock).actionBlock).Fault(t.Exception); //send the exception down the pipeline
     else actionBlock.Complete(); //tell the next block that we're done with the bufferblock
 });

You can then catch the exception (AggregateException) when waiting for the pipeline. Do you really need to use the concurrentdictionary in your actual code for the tracking, because that could be causing the issue when it fails to add, since when the linkto predicate returns false, it does not pass the data into the next block of the pipeline.

Community
  • 1
  • 1
JNYRanger
  • 6,829
  • 12
  • 53
  • 81
  • Thanks JNYRanger, I will give this a try. The purpose of the TryAdd over the TryGet, was for thread safety. The other was that the concurrentdictionary is optimized for reads. In my case 2 correlating ids where getting past the check and both trying to add. Hence I opted for the TryAdd in this case the one would win the race condition and the other would be out on hold. – rizan Jul 08 '14 at 04:03
  • 1
    @rizan Makes sense, but in order to have everything pass through, you would need to have a second `LinkTo()` to handle the failures adding to the ConcurrentDictionary, otherwise your pipeline will break, which is what you are experiencing. – JNYRanger Jul 08 '14 at 12:55
  • your methodology and the samples you provided work excellent in a setting where the completion can be awaited on. What I did was go with I3arnon's methodology with some slight tweaks. I used TDD to write the tests first so I cover all bases. Thank you so much for the help. – rizan Jul 08 '14 at 20:17
  • @rizan Glad I could help and at least point you in the right direction. I recommend that you answer your own question to show others how you resolved your issue. – JNYRanger Jul 08 '14 at 21:22