15

I want to execute a query over a stream of data while processing items in parallel with a certain degree of parallelism. Normally, I'd use PLINQ for that, but my work items are not CPU bound but IO bound. I want to use async IO. PLINQ does not support async work.

What's the smartest way of running a PLINQ-style query, but with async work items?


Here's a more detailed illustration of the problem:

My goal is to process a potentially infinite stream of "items" in a way that is logically described by the following query:

var items = new int[10]; //simulate data

var results =
 from x in items.AsParallel().WithDegreeOfParallelism(100)
 where Predicate(x)
 select ComputeSomeValue(x);

foreach (var result in results)
 PerformSomeAction(result);

This query is just a sketch of the real query. Now I want each of the placeholder functions to be asynchronous (returning a Task and internally being based on async IO).

Note, that there might be far more items than can be stored in memory. I also must control the degree of parallelism to max out the underlying network and disk hardware.

This question is not about multi-core. It fully applies to machines with only one CPU core because the IO can still benefit from parallelism. Think of slow web-service calls and the like.

usr
  • 168,620
  • 35
  • 240
  • 369
  • 2
    +1, great question. I wonder if you could leverage completion-ports on the IO side to achieve parallelism? Disclaimer: I've used them a lot in C++, but never C#. – Moo-Juice Jan 16 '14 at 18:32
  • 2
    You have said that the "work items are not CPU bound but IO bound". Therefore, a large number of cores & CPU parallelism would not be of too much help. I mean, if CPU usage is low and I/O usage is high regarding those 2 operations, then create n = 10 chained actions (ComputeSomeValue, continued by PerformSomeAction) and start the chains sequentially. *New task(ComputeSomeValue).ContinueWith(...)* etc. – turdus-merula Jan 16 '14 at 20:34
  • You can apply that filter you have using PLINQ (CPU parallelism), but that's all... You have to start tasks for the I/O part... IMHO. – turdus-merula Jan 16 '14 at 20:40
  • Have you looked at TPL Dataflow? Obviously more verbose than raw PLINQ, but seems to fit exactly what you need. If you're awesome, create a set of LINQ bindings for it so that it can be expressed in LINQ syntax. – lobsterism Jan 16 '14 at 21:04
  • Also check out Skeet's blog on Eduasync part 19: ordering by completion, ahead of time... – lobsterism Jan 16 '14 at 21:12
  • All placeholder functions in the example should be asynchronous, even the filter and the final action. I also want to keep this question general so that it is applicable to many situations. Please do not assume too much about what the functions do.; I'm ok with starting tasks myself (no other way to do it). But the query pipeline should be managed because that seems pretty hard. – usr Jan 16 '14 at 21:21
  • @usr, to clarify, the placeholders are: `Predicate`, `ComputeSomeValue`, `PerformSomeAction`, they should be async and return `Task`, right? – noseratio Jan 17 '14 at 04:09
  • 1
    @Noseratio yes. I hope that this question will give some general insight and be applicable to all problems with similar "pipeline" structure. – usr Jan 17 '14 at 09:12

2 Answers2

6

This sounds like a job for Microsoft's reactive framework.

I started with this code as my initial variables:

var items = Enumerable.Range(0, 10).ToArray();

Func<int, bool> Predicate = x => x % 2 == 0;

Func<int, int> ComputeSomeValue = x =>
{
    Thread.Sleep(10000);
    return x * 3;
};

Now, I used regular LINQ query as a base-line:

var results =
    from x in items
    where Predicate(x)
    select ComputeSomeValue(x);

This took 50 seconds to compute the following results:

enumerable

Then I switched over to an observable (reactive framework) query:

var results =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select y;

This took 10 seconds to get:

observable

It's clearly computing in parallel.

However, the results are out of order. So I changed the query to this:

var query =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

var results =
    query
        .ToEnumerable()
        .OrderBy(z => z.x)
        .Select(z => z.y);

That still took 10 seconds, but I got the results back in the correct order.

Now, the only issue here is the WithDegreeOfParallelism. There's a coupe of things to try here.

First up I changed the code to produce 10,000 values with a 10ms compute time. My standard LINQ query still took 50 seconds. But the reactive query took 6.3 seconds. If it could perform all the computations at the same time it should have taken much less. This shows that it is maxing out the asynchronous pipeline.

The second point is that the reactive framework uses schedulers for all of the work scheduling. You could try the variety of schedulers that come with the reactive framework to find an alternative if the built-in one doeesn't do what you want. Or you could even write your own scheduler to do whatever scheduling you like.


Here's a version of the query that computes the predicate in parallel too.

var results =
    from x in items.ToObservable()
    from p in Observable.Start(() => Predicate(x))
    where p
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • I have not used Rx before. Can the predicate and ComputeSomeValue be made fully async (returning a Task)? In your example that would be Task.Delay. – usr Jan 16 '14 at 23:18
  • You can convert observables to tasks and task to observables using the reactive framework. But in my experience the reactive framework produces far more succinct code than the TPL. I'd suggest doing your computation solely in the reactive framework. – Enigmativity Jan 17 '14 at 00:22
  • @usr - you can also make the call to `Predicate` an observable call, making it also run in parallel. Te reactive framework does a nice job of mixing the scheduling of all parts of the query so it is performed efficiently. – Enigmativity Jan 17 '14 at 00:24
1

As stated here, PLINQ is for running LINQ queries in parallel on multi-core/multi-processor systems. It hasn't too much to do about cool systems having a lot of disk units and super networking capabilities. AFAIK, it's made for running executable code on more cores, not for concurrently dispatching multiple I/O requests to the operating system.

Maybe your Predicate(x) is CPU bound, therefore you may perform that filtering operation using PLINQ. But you cannot apply the I/O demanding operations (ComputeSomeValue, PerformSomeAction) in the same way.

What you can do is to define a chain of operations (two in your case) for each item (see continuation tasks) and dispatch that chain (sequentially (?)).

Also, you have mentioned something about an "infinite stream of items". This may sound a bit as the producer-consumer problem - if those items are also I/O generated.

Maybe your problem is not that multi-core friendly... It may be just I/O demanding, that's all...

Community
  • 1
  • 1
turdus-merula
  • 8,546
  • 8
  • 38
  • 50
  • 2
    CPU load is not significant. But I do require async IO and a very high degree of parallelism to max out the IO. The question fully applies to machines with one CPU core because the IO completions will be multiplexed onto the one core.; I could define a "chain of operators" but there are too many items to kick off all work at once. I need a throttled/guaranteed level of parallelism. – usr Jan 16 '14 at 21:25
  • @usr that sounds exactly like `TPL Dataflow` – i3arnon Jan 16 '14 at 22:38