I was experimenting with CCR iterators as a solution to a task that requires parallel processing of tons of data feeds, where the data from each feed needs to be processed in order. None of the feeds are dependent on each other, so the in-order processing can be paralleled per-feed.
Below is a quick and dirty mockup with one integer feed, which simply shoves integers into a Port at a rate of about 1.5K/second, and then pulls them out using a CCR iterator to keep the in-order processing guarantee.
class Program
{
static Dispatcher dispatcher = new Dispatcher();
static DispatcherQueue dispatcherQueue =
new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
static Port<int> intPort = new Port<int>();
static void Main(string[] args)
{
Arbiter.Activate(
dispatcherQueue,
Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));
int counter = 0;
Timer t = new Timer( (x) =>
{ for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
, null, 0, 1000);
Console.ReadKey();
}
public static IEnumerator<ITask> ProcessInts()
{
while (true)
{
yield return intPort.Receive();
int currentValue;
if( (currentValue = intPort) % 1000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue, intPort.ItemCount);
}
}
}
}
What surprised me about this greatly was that CCR could not keep up on a Corei7 box, with the queue size growing without bounds. In another test to measure the latency from the Post() to the Receive() under a load or ~100 Post/sec., the latency between the first Post() and Receive() in each batch was around 1ms.
Is there something wrong with my mockup? If so, what is a better way of doing this using CCR?