I apologize in advance for the title but it's the best I could think of to describe the action.
The requirement is processing requests of a message bus. Requests coming in maybe related to an id which correlates or groups these requests. The behavior I want is that for a stream of requests to process correlating ids synchronously. However different ids can be processed asynchronously.
I am using a concurrentdictionary to track the request being processed and the predicate in the linkto.
This is suppose to provide the synchronous processing of related requests.
However the behavior I get is that the first request gets processed and the second request gets dropped.
I have attached the sample code from a console application to simulate the issue.
Any direction or feedback will be appreciated.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication2
{
class Program
{
static void Main(string[] args)
{
var requestTracker = new ConcurrentDictionary<string, string>();
var bufferBlock = new BufferBlock<Request>();
var actionBlock = new ActionBlock<Request>(x =>
{
Console.WriteLine("processing item {0}",x.Name);
Thread.Sleep(5000);
string itemOut = null;
requestTracker.TryRemove(x.Id, out itemOut);
});
bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));
var publisher = Task.Run(() =>
{
var request = new Request("item_1", "first item");
bufferBlock.SendAsync(request);
var request_1 = new Request("item_1", "second item");
bufferBlock.SendAsync(request_1);
});
publisher.Wait();
Console.ReadLine();
}
}
public class Request
{
public Request(string id, string name)
{
this.Id = id;
this.Name = name;
}
public string Id { get; set; }
public string Name { get; set; }
}
}