I have an issue with data concurrent processing. My PC is running out of RAM quickly. Any advices on how to fix my concurrent implementation?
Common class:
public class CalculationResult
{
public int Count { get; set; }
public decimal[] RunningTotals { get; set; }
public CalculationResult(decimal[] profits)
{
this.Count = 1;
this.RunningTotals = new decimal[12];
profits.CopyTo(this.RunningTotals, 0);
}
public void Update(decimal[] newData)
{
this.Count++;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
}
public void Update(CalculationResult otherResult)
{
this.Count += otherResult.Count;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
}
}
Single-core implementation of the code is following:
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
foreach (var i in itterations)
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination))
combinations[combination].Update(newData);
else
combinations.Add(combination, new CalculationResult(newData));
}
Multi-core implementation:
ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
Parallel.ForEach(itterations, (i, state) =>
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// ..
// add combination to combinations -> same logic as in single core implementation
results.Add(combinations);
});
Dictionary<string, CalculationResult> combinationsReal = new Dictionary<string, CalculationResult>();
foreach (var item in results)
{
foreach (var pair in item)
{
if (combinationsReal.ContainsKey(pair.Key))
combinationsReal[pair.Key].Update(pair.Value);
else
combinationsReal.Add(pair.Key, pair.Value);
}
}
The issue I am having is that almost each combinations
dictionary ends up with 930k
records in it, which is on average consumes 400 [MB]
RAM memory.
Now, in single core implementation there is only one such dictionary. All checks are performed against one dictionary. But this is slow approach and I want to use multi-core optimizations.
In multi-core implementation there is a ConcurrentBag
instance created which holds all combinations
dictionaries. As soon as the multi-thread job is finished - all dictionaries are aggregated into one. This approach works well for small amount of concurrent iterations. For example, for 4 iterations my RAM usage was ~ 1.5 [GB]
. The issue arises, when I set the full amount of parallel iterations, which is 200! No amount of PC RAM
is enough to hold all dictionaries, with million records each!
I was thinking about using ConcurrentDictioanary
, until I found out that the "TryAdd" method does not guarantee integrity of added data in my situation, as I also need to run updates on running totals.
The only real multi-threaded option is, instead of adding all combinations
to dictionary - is to save them to some DB. Data aggregation will then be a matter of 1 SQL select
statement with a group by
clause... but I don't like the idea of creating a temporary table and running DB instance just for that..
Is there a work around on how to processes data concurrently and not run out of RAM?
EDIT:
Maybe the real question should have been - how to make updating of RunningTotals
thread-safe when using ConcurrentDictionary
? I have just ran across this thread, with a similar issue with ConcurrentDictionary
, but my situation seems to be more complicated as I have an array that needs to be updated. I am still investigating this matter.
EDIT2: Here is a working solution with ConcurrentDictionary
. All I needed to do is to add a lock for the dictionary key.
ConcurrentDictionary<string, CalculationResult> combinations = new ConcurrentDictionary<string, CalculationResult>();
Parallel.ForEach(itterations, (i, state) =>
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination)) {
lock(combinations[combination])
combinations[combination].Update(newData);
}
else
combinations.TryAdd(combination, new CalculationResult(newData));
});
Single-thread code execution time is 1m 48s
, whereas this solution execution time is 1m 7s
for 4 iterations (37% performance increase). I am still wondering if SQL approach will be any faster, with millions of records? I will test it out possibly tomorrow and update.
Edit 3: For those of you wondering what's wrong with ConcurrentDictionary
updates on a value - run this code with and without the lock.
public class Result
{
public int Count { get; set; }
}
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
List<int> keys = new List<int>();
for (int i = 0; i < 100; i++)
keys.Add(i);
ConcurrentDictionary<int, Result> dict = new ConcurrentDictionary<int, Result>();
Parallel.For(0, 8, i =>
{
foreach(var key in keys)
{
if (dict.ContainsKey(key))
{
//lock (dict[key]) // uncomment this
dict[key].Count++;
}
else
dict.TryAdd(key, new Result());
}
});
// any output here is incorrect behavior. best result = no lines
foreach (var item in dict)
if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }
Console.WriteLine($"Finish");
Console.ReadKey();
}
}
Edit 4: After trials and errors I couldn't optimize SQL approach. This turned out to be the worst idea :) I have used an SQL Lite
database. In-memory and in-file. With transaction and reusable SQL command parameters. Due to the huge amount of records that needed to be inserted - the performance is lacking. Data aggregation is the easiest part, but it takes a huge amount of time just to insert 4 millions of rows, I can't even begin to imagine how the 240 million of data could be processed efficiently.. So far (and also strangely), ConcurrentBag
approach seems to be the fastest on my PC. Followed by a ConcurrentDictionary
approach. ConcurrentBag
is a bit heavier on memory, though. Thanks to the work of @Alisson - it is now perfectly fine to use it for larger set of iterations!