We already have parallel fan-out working in our code (using ParallelEnumerable
) which is currently running on a 12-core, 64G RAM server. But we would like to convert the code to use Rx so that we can have better flexibility over our downstream pipeline.
Current Workflow:
We read millions of records from a database (in a streaming fashion).
On the client side, we then use a custom
OrderablePartitioner<T>
class to group the database records into groups. Let’s call an instance of this class:partioner
.We then use
partioner.AsParallel().WithDegreeOfParallelism(5).ForAll(group => ProcessGroupOfRecordsAsync(group));
Note: this could be read as “Process all the groups, 5 at a time in parallel.” (I.e. parallel fan-out).ProcessGroupOfRecordsAsync()
– loops through all the records in the group and turns them into hundreds or even thousands of POCO objects for further processing (i.e. serial fan-out or better yet, expand).
Depending on the client’s needs:This new serial stream of POCO objects are evaluated, sorted, ranked, transformed, filtered, filtered by manual process, and possibly more parallel and/or serial fanned-out throughout the rest of the pipeline.
The end of the pipeline may end up storing new records into the database, displaying the POCO objects in a form or displayed in various graphs.
The process currently works just fine, except that point #5 and #6 aren’t as flexible as we would like. We need the ability to swap in and out various downstream workflows. So, our first attempt was to use a Func<Tin, Tout>
like so:
partioner.AsParallel
.WithDegreeOfParallelism(5)
.ForAll(group =>ProcessGroupOfRecordsAsync(group, singleRecord =>
NextTaskInWorkFlow(singleRecord));
And that works okay, but the more we flushed out our needs the more we realized we are just re-implementing Rx.
Therefore, we would like to do something like the following in Rx:
IObservable<recordGroup> rg = dbContext.QueryRecords(inputArgs)
.AsParallel().WithDegreeOfParallelism(5)
.ProcessGroupOfRecordsInParallel();
If (client1)
rg.AnalizeRecordsForClient1().ShowResults();
if (client2)
rg.AnalizeRecordsForClient2()
.AsParallel()
.WithDegreeOfParallelism(3)
.MoreProcessingInParallel()
.DisplayGraph()
.GetUserFeedBack()
.Where(data => data.SaveToDatabase)
.Select(data => data.NewRecords)
.SaveToDatabase(Table2);
...
using(rg.Subscribe(groupId =>LogToScreen(“Group {0} finished.”, groupId);