This will be a long question, sorry in advance. I don't expect a full code solution, I am looking for some input from people with a different perspective and some more experience than me.
My company is developing software for a product that does some rather expensive calculations using a film from an IR camera where every pixel contains a temperature value. The most costly of those methods is called Thermal Signal Reconstruction (if you are interested, you can read about it here https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4321698/ ). It basically performs a polynomial fit for each pixel over time (the number of frames). My C# implementation looks something like this:
public static double[,,] ThermalSignalReconstruction(List<Frame<double>> thermalFilm, byte polyOrder)
{
Resolution filmResolution = thermalFilm[0].Resolution;
uint width = filmResolution.Width;
uint height = filmResolution.Height;
int frames = thermalFilm.Count;
double[,,] result = new double[polyOrder + 1, height, width];
// using frame indexes as x-values for poly fit
List<double> frameIndexes = new List<double>(frames);
for (var frame = 0U; frame < frames; ++frame)
frameIndexes.Add(frame);
// make local copy of thermal film and fill with difference images
List<Frame<double>> localThermalFilm = new List<Frame<double>>(frames);
for (var frame = 0U; frame < frames; ++frame)
localThermalFilm.Add(new Frame<double>(filmResolution));
Parallel.For(0U, frames, frame =>
{
for (var row = 0U; row < height; ++row)
for (var col = 0U; col < width; ++col)
localThermalFilm[(int)frame].Data[row, col] = thermalFilm[(int)frame].Data[row, col] - thermalFilm[0].Data[row, col];
});
// determine flashpoint by finding the frame with the maximum average pixel value
double maxAverage = double.MinValue;
uint maxIndex = 0U;
Parallel.For(0U, frames, frame =>
{
double average = Math.MatrixMean(localThermalFilm[(int)frame].Data);
if (average > maxAverage)
{
maxAverage = average;
maxIndex = (uint)frame;
}
});
// remove frames preceeding flashpoint, including itself, from film
localThermalFilm.RemoveRange(0, (int)maxIndex + 1);
frameIndexes.RemoveRange(0, (int)maxIndex + 1);
frames -= (int)maxIndex + 1;
// calculate base 10 logarithm of all pixels and frame indexes
Parallel.For(0U, frames, frame =>
{
for (var row = 0U; row < height; ++row)
for (var col = 0U; col < width; ++col)
localThermalFilm[(int)frame].Data[row, col] = System.Math.Log10(localThermalFilm[(int)frame].Data[row, col]);
frameIndexes[(int)frame] = System.Math.Log10(frameIndexes[(int)frame]);
});
// perform polynomial fit for each pixel
Parallel.For(0U, height, row =>
{
for (var col = 0U; col < width; ++col)
{
// extract poly fit input y-values for current pixel
double[] pixelValues = new double[frames];
for (var frame = 0U; frame < frames; ++frame)
pixelValues[frame] = localThermalFilm[(int)frame].Data[row, col];
// (...) do some value validations
// poly fit for current pixel - this is the longest step
double[] coefficients = Math.PolynomialRegression(frameIndexesValidated.ToArray(), pixelValuesValidated.ToArray(), polyOrder);
// insert into coefficient images result array
for (var coefficient = 0U; coefficient < result.GetLength(0); ++coefficient)
result[coefficient, row, col] = coefficients[coefficient];
}
});
return result;
}
As you can see, several parallelized loops performing several operations on the frames are executed in sequence with the polynomial fit (Math.PolynomialRegression) being the last and most expensive one. This is a function containing a polynomial fit algorithm I pieced together myself since it doesn't exist in the standard System.Math library and the only other one I tried from the Math.NET library actually runs slower than the one I wrote. My code is based on examples given on Rosetta Code: https://rosettacode.org/wiki/Polynomial_regression
My point is, that I wrote this entire algorithm before in unmanaged C++, our company decided to move away from that because of some licensing issues with the GUI framework we were using back then and to instead now use C#/.NET. A direct comparison of the old unmanaged C++ code with the one I posted above in managed C# recently showed me that the C# code takes about 48% (!!!) longer to execute than the C++ code even though the algorithm is identical. I am aware that C# is a higher level, managed language and therefore has a greater translation distance than C++ so I fully expected it to run slower, but I didn't expect it to be this bad. 48% is quite a big deal which leads me to believe that I may be doing something wrong. On the other hand, I don't have that much experience yet so if I'm perfectly honest I also don't really know what to expect in a situation like this.
What I have tried so far:
- switching between running the various individual loops sequentially and parallelized, it's fastest with all of them parallelized like above
- adjusting the variables that are accessed by the individual parallelized loop instances (e.g. not accessing the same resolution object each time but declaring separate variables for width and height before starting the loop) which already improved performance quite a bit, but the 48% are still left after that
- trying a Parallel.ForEach(Partitioner.Create(0, frames) ... ) approach, i.e. partitioning the chunks of data more coarsely with the Partitioner class, it didn't help, made the code run slower
- optimizing other functions that are called as well as code on the side of this one's caller as best I can
To home in to the question: is it even possible to make a C# code like this run with comparable performance than the same code in C++, if so how? Or is what I observed perfectly normal and I have to deal with it?
EDIT: added the first three loop bodies in TSR per request and my polynomial regression implementation looks like this:
public static double[] PolynomialRegression(in double[] xValues, in double[] yValues, byte order)
{
Debug.Assert(xValues != null && yValues != null);
Debug.Assert(xValues.Length == yValues.Length);
Debug.Assert(xValues.Length != 0 || yValues.Length != 0);
int dataSamples = xValues.Length;
double[] result = new double[order + 1];
// array containing N,sigma(xi),sigma(xi^2),sigma(xi^3)...sigma(xi^2*poly_order), where N=number of samples
double[] sigmaX = new double[2 * order + 1];
for (var index = 0U; index < sigmaX.Length; ++index)
{
sigmaX[index] = 0.0;
for (var dataPoint = 0U; dataPoint < dataSamples; ++dataPoint)
sigmaX[index] += System.Math.Pow(xValues[(int)dataPoint], index);
}
// array containing sigma(yi),sigma(xi*yi),sigma(xi^2*yi)...sigma(xi^poly_order*yi)
double[] sigmaY = new double[order + 1];
for (var pOrder = 0U; pOrder < sigmaY.Length; ++pOrder)
{
sigmaY[pOrder] = 0.0;
for (var dataPoint = 0U; dataPoint < dataSamples; ++dataPoint)
sigmaY[pOrder] += System.Math.Pow(xValues[(int)dataPoint], pOrder) * yValues[(int)dataPoint];
}
// equation system's augmented normal matrix
int matrixRows = order + 1;
int matrixCols = order + 2;
double[,] matrix = new double[matrixRows, matrixCols];
for (var row = 0U; row < matrixRows; ++row)
for (var col = 0U; col < matrixCols - 1; ++col)
matrix[row, col] = sigmaX[row + col];
for (var row = 0U; row < matrixRows; ++row)
matrix[row, order + 1] = sigmaY[row];
// pivotisation of matrix
for (var pivotRow = 0U; pivotRow < matrixRows; ++pivotRow)
for (var lowerRow = pivotRow + 1U; lowerRow < matrixRows; ++lowerRow)
if (matrix[pivotRow, pivotRow] < matrix[lowerRow, pivotRow])
for (var col = 0U; col < matrixCols; ++col)
{
double temp = matrix[pivotRow, col];
matrix[pivotRow, col] = matrix[lowerRow, col];
matrix[lowerRow, col] = temp;
}
// Gaussian elimination
for (var pivotRow = 0U; pivotRow < matrixRows; ++pivotRow)
for (var lowerRow = pivotRow + 1U; lowerRow < matrixRows; ++lowerRow)
{
double ratio = matrix[lowerRow, pivotRow] / matrix[pivotRow, pivotRow];
for (var col = 0U; col < matrixCols; ++col)
matrix[lowerRow, col] -= ratio * matrix[pivotRow, col];
}
// back-substitution
for (var row = (short)order; row >= 0; --row)
{
result[row] = matrix[row, order + 1];
for (var col = 0U; col < matrixCols - 1; ++col)
if (col != row)
result[row] -= matrix[row, col] * result[col];
result[row] /= matrix[row, row];
}
return result;
}