0

I'd like to know if the following approach is a good way to implement a producer and consumer pattern in C# .NET 4.6.1

Description of what I want to do:
I want to read files, perform calculation on the data within and save the result. Each file has an origin (a device e.g. data logger) and depending on that origin different calculations as well as output formats should be used. The file contains different values, e.g. temperature readings of several sensors. It is important that the calculations have a state. For instance this could be the last value of the previous calculation, e.g. if I want to sum all values of one origin. I want to parallelize the processing per origin. All files from one origin need to be processed sequentially (or more specific chronologically) and cannot be parallelized.

I think the TPL Dataflow might be an appropriate solution for this.

This is the process I came up with:
The reading would be done by an TransformBlock. Next I would create instances of the classes performing operations on the data for each origin. They get initialized with the neccessary parameters, so that they know how to process files for their origin. Then I would create TransformBlocks for each created object (so basically for each origin). Each TransformBlocks would execute a function of the corresponding object. The TransformBlock reading the files would be linked to a BufferBlock, which is linked to each TransformBlock for the processing per origin. The linking would be conditional, so that only data that is meant to reach the processing TranformBlock of an origin would be received. The output of the processing Blocks would be linked with an ActionBlock for writing the output files.
The maxDegreeOfParallelism is set to 1 for every Block.

Is that a viable solution? I thought about implementing this with Tasks and the BlockingCollection, but it seems this would be the easier approach.

Additional Information:

The amount of files processed may be to large in size or number to be loaded at once. Reading and writing should happen concurrent to the processing. As I/O takes time and because data needs to be collected after processing to form an output file, buffering is essential.

Community
  • 1
  • 1
  • 2
    If this is more suitable for programmers.stackexchange.com feel free to migrate. –  Mar 24 '16 at 14:41
  • When do you find out about the origin? Is it known immediately or only after reading the file? – usr Mar 24 '16 at 15:06
  • Conditional linking has the problem that its cost is proportional to the number of origins. – usr Mar 24 '16 at 15:08
  • @usr the "origin" is known before reading the file through the location and filename. –  Mar 24 '16 at 15:09
  • @usr is it that bad for performance? What would be an alternative? –  Mar 24 '16 at 15:10
  • How many files are there? Is it feasible to detect all files at the beginning of the process? – usr Mar 24 '16 at 15:11
  • @usr The amount of files can vary greatly. Yes it would be possible to detect all files before processing (amount of files and size). –  Mar 24 '16 at 15:13
  • `The amount of files processed may be to large in size or number to be loaded at once.` So there a 10s of millions of files? You only need to load the meta-data, not the contents. – usr Mar 24 '16 at 15:35
  • @usr that would be unlikely. Okay if it only needs the meta-data that would good. I'm new to PLINQ so I will need to verify if you solution is what I need. –  Mar 24 '16 at 15:38
  • Sure. Report back if you need anything else. – usr Mar 24 '16 at 15:40

1 Answers1

1

Since the origins are independent and the items for each origin are fully dependent this problem has an easy solution:

var origins = (from f in files
               group f by f.origin into g
               orderby g.Count() descending
               select g);

 var results =
 Partitioner.Create(origins) //disable chunking
 .AsParallel()
 .AsOrdered() //try process the biggest groups first
 .Select(originGroup => {
  foreach (var x in originGroup.OrderBy(...)) Process(x);
  return someResult;
 })
 .ToList();

Process each origin sequentially and origins in parallel.

If you have a need to limit IO is some way you can throw in a SemaphoreSlim to guard the IO paths.

usr
  • 168,620
  • 35
  • 240
  • 369
  • In the orginGroup there is an order (chronologically) so I think add and OrderBy – paparazzo Mar 24 '16 at 15:21
  • Well, that seems like an easy solution. I would manage the file output through the BlockingCollection, correct? –  Mar 24 '16 at 15:26
  • @Paparazzi having concurrent I/O and not flooding memory? –  Mar 24 '16 at 15:33
  • I added a way you can use to easily collect some output. PLINQ is awesome for data flow pipelines. You can also write to files directly from the Select. – usr Mar 24 '16 at 15:34
  • @John but originGroup is process sequentially and you are the one that asked how to parallel origins. Sorry I could not be of help. – paparazzo Mar 24 '16 at 15:41
  • Ok as far as I understand this correctly, it would not be optimal for I/O. PLINQ would parallelize the processing per "origin"-chunk, but reading, processing and writing would be entirely sequential per origin. Therefore I would then need to create a pipeline within "Process(x)". So your answer is a nice idea, but I don't think it simplifies my problem. –  Mar 24 '16 at 16:16
  • You can process each origin however you like. The processing can use an internal pipeline. At this point it might be easier to use TPL dataflow, yes. On the other hand you probably want one IO task at the same time only even across origins and you want N CPU-based tasks also across origins. This cross-origin throttling will not be accomplished by dataflow. You will need to use (e.g.) SemaphoreSlim to accomplish that anyway. – usr Mar 24 '16 at 16:24
  • @usr with TPL you would one Task for Reading, n-Tasks per data logger processing and one Task for Writing. The crossing would be minimal. The throtteling is achieved through the internal use of BlockingCollections by the TPL configured through MaxMessage. If one producers fills the Queue to much, he will be blocked until there is space, no need for Semaphores. –  Mar 24 '16 at 16:31
  • @usr the origins will be <1000. Most likely around 200. –  Mar 24 '16 at 16:32