In my application I have three classes, Extractor
, Transformer
and Loader
, which are coordinated by a fourth class, Coordinator
. Extractor
, Transformer
and Loader
are very simple and do the following:
Extractor
Exposes a member called Results
of type IEnumerable<string>
, for example by reading from a text file. Extraction should be synchronous.
Transformer
Exposes a member called Transform
which accepts a string and transforms it to another string via some process which is expected to be time-consuming (use parallel processing here).
Loader
Exposes a member called Load
which accepts a string and loads it into some final form (e.g. another text file). Loading should be synchronous.
The Coordinator
classes coordinates the three operations. The transformation process should be done in parallel, and then push the results to a queue which is read by the loader. Coordinator
's Run()
method looks like this:
Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();
ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();
Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x)));
foreach(string output in outputs)
{
loader.Load(output);
}
This is working well, except that ALL transformation must be finished before any loading can be done - i.e. the Parallel.ForEach()
completes before the following foreach
starts. I would prefer that each output is passed to the loader as soon as it is ready.
I also tried this:
Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();
ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();
foreach (string input in extractor.Results)
{
string input1 = input;
Task task = Task.Factory.StartNew(
() => outputs.Enqueue(transformer.Transform(input1)));
}
foreach(string output in outputs)
{
loader.Load(output);
}
But then the foreach
loop at the bottom gets hit before any outputs have been added to the queue, and so it simply exits.
How do I get loading to happen as soon as results are available from the calls to transformer.Transform()
?