5

I am educating myself on Parallel.Invoke, and parallel processing in general, for use in current project. I need a push in the right direction to understand how you can dynamically\intelligently allocate more parallel 'threads' as required.

As an example. Say you are parsing large log files. This involves reading from file, some sort of parsing of the returned lines and finally writing to a database.

So to me this is a typical problem that can benefit from parallel processing.

As a simple first pass the following code implements this.

Parallel.Invoke(
  ()=> readFileLinesToBuffer(),
  ()=> parseFileLinesFromBuffer(),
  ()=> updateResultsToDatabase()    
);

Behind the scenes

  1. readFileLinesToBuffer() reads each line and stores to a buffer.
  2. parseFileLinesFromBuffer comes along and consumes lines from buffer and then let's say it put them on another buffer so that updateResultsToDatabase() can come along and consume this buffer.

So the code shown assumes that each of the three steps uses the same amount of time\resources but lets say the parseFileLinesFromBuffer() is a long running process so instead of running just one of these methods you want to run two in parallel.

How can you have the code intelligently decide to do this based on any bottlenecks it might perceive?

Conceptually I can see how some approach of monitoring the buffer sizes might work, spawning a new 'thread' to consume the buffer at an increased rate for example...but I figure this type of issue has been considered in putting together the TPL library.

Some sample code would be great but I really just need a clue as to what concepts I should investigate next. It looks like maybe the System.Threading.Tasks.TaskScheduler holds the key?

SleepyBoBos
  • 1,411
  • 2
  • 11
  • 16
  • with async/await, I would say just write the 'normal' sequential/imperative-type code and just make sure you're doing async calls and not any blocking calls. Another approach (although potentially overkill for this) is to use TPL DataFlow, where you can set up processing 'blocks' and then just wire them up. http://msdn.microsoft.com/en-us/devlabs/gg585582.aspx – James Manning May 31 '12 at 05:14
  • Does order of elements matter to you? – svick May 31 '12 at 07:16
  • No, order is not important. I figure as long as I get the lines from files into a database I can do any ordering in queries done via other 'applications' eg reports or web app. _I am busily looking at Rx based on suggestion below (but all suggestions so far have been useful in my learning...ca-chink...ca-chink..as the pieces fall into place)_ – SleepyBoBos Jun 01 '12 at 03:33

3 Answers3

4

Have you tried the Reactive Extensions?

http://msdn.microsoft.com/en-us/data/gg577609.aspx

The Rx is a new tecnology from Microsoft, the focus as stated in the official site:

The Reactive Extensions (Rx)... ...is a library to compose asynchronous and event-based programs using observable collections and LINQ-style query operators.

You can download it as a Nuget package

https://nuget.org/packages/Rx-Main/1.0.11226

Since I am currently learning Rx I wanted to take this example and just write code for it, the code I ended up it is not actually executed in parallel, but it is completely asynchronous, and guarantees the source lines are executed in order.

Perhaps this is not the best implementation, but like I said I am learning Rx, (thread-safe should be a good improvement)

This is a DTO that I am using to return data from the background threads

class MyItem
{
    public string Line { get; set; }
    public int CurrentThread { get; set; }
}

These are the basic methods doing the real work, I am simulating the time with a simple Thread.Sleep and I am returning the thread used to execute each method Thread.CurrentThread.ManagedThreadId. Note the timer of the ProcessLine it is 4 sec, it's the most time-consuming operation

private IEnumerable<MyItem> ReadLinesFromFile(string fileName)
{
    var source = from e in Enumerable.Range(1, 10)
                 let v = e.ToString()
                 select v;

    foreach (var item in source)
    {
        Thread.Sleep(1000);
        yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item };
    }
}

private MyItem UpdateResultToDatabase(string processedLine)
{
    Thread.Sleep(700);
    return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

private MyItem ProcessLine(string line)
{
    Thread.Sleep(4000);
    return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

The following method I am using it just to update the UI

private void DisplayResults(MyItem myItem, Color color, string message)
{
    this.listView1.Items.Add(
        new ListViewItem(
            new[]
            {
                message, 
                myItem.Line ,
                myItem.CurrentThread.ToString(), 
                Thread.CurrentThread.ManagedThreadId.ToString()
            }
        )
        {
            ForeColor = color
        }
    );
}

And finally this is the method that calls the Rx API

private void PlayWithRx()
{
    // we init the observavble with the lines read from the file
    var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool);

    source.ObserveOn(this).Subscribe(x =>
    {
        // for each line read, we update the UI
        this.DisplayResults(x, Color.Red, "Read");

        // for each line read, we subscribe the line to the ProcessLine method
        var process = Observable.Start(() => this.ProcessLine(x.Line), Scheduler.TaskPool)
            .ObserveOn(this).Subscribe(c =>
            {
                // for each line processed, we update the UI
                this.DisplayResults(c, Color.Blue, "Processed");

                // for each line processed we subscribe to the final process the UpdateResultToDatabase method
                // finally, we update the UI when the line processed has been saved to the database
                var persist = Observable.Start(() => this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool)
                    .ObserveOn(this).Subscribe(z => this.DisplayResults(z, Color.Black, "Saved"));
            });
    });
}

This process runs totally in the background, this is the output generated:

enter image description here

Jupaol
  • 21,107
  • 8
  • 68
  • 100
  • If you want to use Rx, you're likely going to want to use JeffVa's code for reading lines from a file as an observable - see http://blogs.msdn.com/b/jeffva/archive/2010/07/26/rx-on-the-server-part-2-of-n-asynchronous-streamreader.aspx – James Manning May 31 '12 at 05:26
0

in an async/await world, you'd have something like:

public async Task ProcessFileAsync(string filename)
{
    var lines = await ReadLinesFromFileAsync(filename);
    var parsed = await ParseLinesAsync(lines);
    await UpdateDatabaseAsync(parsed);
}

then a caller could just do var tasks = filenames.Select(ProcessFileAsync).ToArray(); and whatever (WaitAll, WhenAll, etc, depending on context)

James Manning
  • 13,429
  • 2
  • 40
  • 64
0

Use a couple of BlockingCollection. Here is an example

The idea is that you create a producer that puts data into the collection

while (true) {
    var data = ReadData();
    blockingCollection1.Add(data);
}

Then you create any number of consumers that reads from the collection

while (true) {
    var data = blockingCollection1.Take();
    var processedData = ProcessData(data);
    blockingCollection2.Add(processedData);
}

and so on

You can also let TPL handle the number of consumers by using Parallel.Foreach

Parallel.ForEach(blockingCollection1.GetConsumingPartitioner(),
                 data => {
                          var processedData = ProcessData(data);
                          blockingCollection2.Add(processedData);
                 });

(note that you need to use GetConsumingPartitioner not GetConsumingEnumerable (see here)

Community
  • 1
  • 1
adrianm
  • 14,468
  • 5
  • 55
  • 102