4

In my application I have three classes, Extractor, Transformer and Loader, which are coordinated by a fourth class, Coordinator. Extractor, Transformer and Loader are very simple and do the following:

Extractor

Exposes a member called Results of type IEnumerable<string>, for example by reading from a text file. Extraction should be synchronous.

Transformer

Exposes a member called Transform which accepts a string and transforms it to another string via some process which is expected to be time-consuming (use parallel processing here).

Loader

Exposes a member called Load which accepts a string and loads it into some final form (e.g. another text file). Loading should be synchronous.

The Coordinator classes coordinates the three operations. The transformation process should be done in parallel, and then push the results to a queue which is read by the loader. Coordinator's Run() method looks like this:

Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();

ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();

Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x)));

foreach(string output in outputs)
{
  loader.Load(output);
}

This is working well, except that ALL transformation must be finished before any loading can be done - i.e. the Parallel.ForEach() completes before the following foreach starts. I would prefer that each output is passed to the loader as soon as it is ready.

I also tried this:

Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();

ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();

foreach (string input in extractor.Results)
{
  string input1 = input;
  Task task = Task.Factory.StartNew(
                    () => outputs.Enqueue(transformer.Transform(input1)));
}

foreach(string output in outputs)
{
  loader.Load(output);
}

But then the foreach loop at the bottom gets hit before any outputs have been added to the queue, and so it simply exits.

How do I get loading to happen as soon as results are available from the calls to transformer.Transform()?

David
  • 15,750
  • 22
  • 90
  • 150
  • 2
    Look into the Producer/Consumer pattern. Old now, but here's a c# simple example: http://stackoverflow.com/questions/1656404/c-sharp-producer-consumer – Joel Coehoorn Oct 03 '12 at 16:04

1 Answers1

7

Try a BlockingCollection instead with Parallel.Invoke. In the example below, GetConsumingEnumerable (the Consumer part of the Producer-Consumer pattern) won't finish until CompleteAdding is called so load will run until fill is completed.

var outputs = new BlockingCollection<string>();

// aka Producer
Action fill = () => {
    Parallel.ForEach(extractor.Results, x => outputs.Add(transformer.Transform(x)));        
    outputs.CompleteAdding();
};

// aka Consumer
Action load = () => {
   foreach(var o in outputs.GetConsumingEnumerable()) 
       loader.Load(o);
}

Parallel.Invoke(fill, load);
Austin Salonen
  • 49,173
  • 15
  • 109
  • 139