0

I'm working with a memory hungry application that makes uses of Task to do processing in parallel. The problem is that it generates a lot of memory, then hangs onto it, overloading my 16GByte system until the GC runs. At this point, it's performance is awful and can take days to finish. The original application would normally take 30 minutes to run. This is a stripped down version of this:

class Program
{
    static void Main(string[] args)
    {
        var tasks = new List<Task<string[]>>();
        var report = new List<string>();

        for (int i = 0; i < 2000; i++)
        {
            tasks.Add(Task<string[]>.Factory.StartNew(DummyProcess.Process));
        }

        foreach (var task in tasks)
        {
            report.AddRange(task.Result);
        }

        Console.WriteLine("Press RETURN...");
        Console.ReadLine();
    }
}

Here's the 'processor':

public static class DummyProcess
{
    public static string[] Process()
    {
        var result = new List<string>();

        for (int i = 1; i < 10000000; i++)
        {
            result.Add($"This is a dummy string of some length [{i}]");
        }

        var random = new Random();
        var delay = random.Next(100, 300);

        Thread.Sleep(delay);

        return result.ToArray();
    }
}

The problem I believe is here:

foreach (var task in tasks)
{
   report.AddRange(task.Result);
}

The tasks don't get disposed when they're done - what's the best way to get the result (string[]) out of the task and then dispose of the task?

I did try this:

foreach (var task in tasks)
{
   report.AddRange(task.Result);
   task.Dispose();
}

However not much difference. What I might try is simply stopping the results being returned, that way the huge 10 - 50 MBytes of strings aren't retained (in the original application).

EDIT: I tried replacing the code to read the results with the following:

while (tasks.Any())
{
    var listCopy = tasks.ToList();

    foreach (var task in listCopy)
    {
        if (task.Wait(0))
        {
            report.AddRange(task.Result);
            tasks.Remove(task);
            task.Dispose();
        }
    }

    Thread.Sleep(300);
}

I had to abort after two hours - I'll let it run overnight tonight and see if it finishes. Memory usage seemed better as it ran but still slow.

imekon
  • 1,501
  • 4
  • 22
  • 39

2 Answers2

0

You are right, the problem is there

foreach (var task in tasks)
{
   report.AddRange(task.Result);
}

But the problem is much bigger than you think. Every call to Result blocks calling thread effectively turning your code into over-engineered serial version that also has some Sleeps in it, too bad!

I suggest turning your code into parallel version first, for instance by adding continuation to every task:

task.ContinueWith(t => {
    //NOTE1 that t.Result is already ready here
    //NOTE2 you need synchronization for your data structure, mutex or synchronized collection
    report.AddRange(t.Result);
});

Once thats done I also suggest every task to remove itself from task list, this will let GC collect it ASAP along with heavy result it holds, I suggest using explicit Dispose only as a last resort here, in sum:

task.ContinueWith(t => {
    //NOTE1 that t.Result is already ready here
    //NOTE2 you need synchronization for your data structure, mutex or synchronized collection
    report.AddRange(t.Result);
    //NOTE3 Synchronize access to task list!
    tasks.Remove(t);
});

Or, alternatively, you could go one level above Task based parallelism and apply Parallel method from the beginning:

ParallelLoopResult result = Parallel.For(0, 2000, ctr => {  
    // NOTE you still need to synchronize access to report
    report.Add(/*get your result*/);
});

To paraphrase this answer : While the results will be the same, this will introduce far less overhead than Tasks, especially for large collection like yours (2000 items), and cause the overall runtimes to be faster.

Community
  • 1
  • 1
Oleg Bogdanov
  • 1,712
  • 13
  • 19
0

The task.Result will keep a reference to the result array until the task is no longer accessible from any root. This means that all of the result arrays will exist until the tasks list goes out of scope.

Also, you create 2000 threads, that means that you can have up to 2000 sets of results data waiting at the same time. If you change to a consumer-producer model and have Environment.ProcessorCount threads performing a queue of work that holds 2000 jobs you will have less things "in flight" using memory. Using tools like TPL Dataflow you can create a pipeline that has a limited number of workers and the workers will not start new work until the previous workers has had its work processed by the next link in the chain.

    static void Main(string[] args)
    {
        var report = new List<string>();

                                                         //We don't use i because you did not have Process accept a parameter of any kind.
        var producer = new TransformBlock<int, string[]>((i) => DummyProcess.Process(), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = Environment.ProcessorCount});

        //Only 20 processed items can be in flight at once, if the queue is full it will block the producer threads which there only is Environment.ProcessorCount of them.
        //Only 1 thread is used for the consumer.
        var consumer = new ActionBlock<string[]>((result) => report.AddRange(result), new ExecutionDataflowBlockOptions{BoundedCapacity = 20});
        producer.LinkTo(consumer, new DataflowLinkOptions {PropagateCompletion = true});

        for (int i = 0; i < 2000; i++)
        {
            //We just add dummy values to queue up 2000 items to be processed.
            producer.Post(i);
        }
        //Signals we are done adding to the producer.
        producer.Complete();

        //Waits for the consumer to finish processing all pending items.
        consumer.Completion.Wait();

        Console.WriteLine("Press RETURN...");
        Console.ReadLine();
    }
}
Scott Chamberlain
  • 124,994
  • 33
  • 282
  • 431