I am trying to optimize the process of a big collection (~ 1million items or so). I am thinking about using Parallel.ForEach
but it seems that it does not make things necessarily faster in my case.
Basically the process is two-folds:
- Process the collection with some CPU bound operations
- Perform some IO bound (database-related stuff)
One optimization I was thinking of is to process the part of the collection that has been processed in 1. through a BlockingCollection
.
The theory is that processing 1. with Parallel.ForEach
is faster than using a sequential approach, but it would be better if the IO could be whenever a result of CPU bound operation is block (but this part has to been done sequentially... due to the related DB locks...) so that at least 1. is optimized and faster.
public static class Program
{
[CoreJob]
[RPlotExporter, RankColumn]
public class Paralleling
{
private IEnumerable<int> _items;
[GlobalSetup]
public void Setup()
{
_items = Enumerable.Range(0, 1000);
}
public static long Ackermann(long m, long n)
{
if (m > 0)
{
if (n > 0)
{
return Ackermann(m - 1, Ackermann(m, n - 1));
}
if (n == 0)
{
return Ackermann(m - 1, 1);
}
}
else if (m == 0)
{
if (n >= 0)
{
return n + 1;
}
}
throw new ArgumentOutOfRangeException();
}
[Benchmark]
public async Task ParallelAndSequential()
{
var blockingCollection = new BlockingCollection<(bool, int, int)>();
Task.Run(() =>
{
Parallel.ForEach(_items, item =>
{
Ackermann(0, 2);
var dummy = item % 2 == 0;
blockingCollection.Add((dummy, item, Thread.CurrentThread.ManagedThreadId));
});
blockingCollection.CompleteAdding();
});
using(var streamWriter = new StreamWriter(new MemoryStream()))
{
foreach (var result in blockingCollection.GetConsumingEnumerable())
{
await streamWriter.WriteLineAsync(result.ToString());
await Task.Delay(10);
}
}
}
[Benchmark]
public async Task AllSequential()
{
using(var streamWriter = new StreamWriter(new MemoryStream()))
{
foreach (var item in _items)
{
Ackermann(0, 2);
var dummy = item % 2 == 0;
var result = (dummy, item, Thread.CurrentThread.ManagedThreadId);
await streamWriter.WriteLineAsync(result.ToString());
await Task.Delay(10);
}
}
}
}
public static void Main(params string[] args)
{
var summary = BenchmarkRunner.Run<Paralleling>();
}
}
The results of the benchmark:
// Validating benchmarks:
// ***** BenchmarkRunner: Start *****
// ***** Found 2 benchmark(s) in total *****
// ***** Building 1 exe(s) in Parallel: Start *****
// start dotnet restore /p:UseSharedCompilation=false /p:BuildInParallel=false /m:1 in C:\Users\eperret\Desktop\Playground\ConsoleApp\ConsoleApp\ConsoleApp\bin\Release\netcoreapp2.2\e6babe6d-16ff-42cd-aa3e-d457250f812c
// command took 1.83s and exited with 0
// start dotnet build -c Release --no-restore /p:UseSharedCompilation=false /p:BuildInParallel=false /m:1 in C:\Users\eperret\Desktop\Playground\ConsoleApp\ConsoleApp\ConsoleApp\bin\Release\netcoreapp2.2\e6babe6d-16ff-42cd-aa3e-d457250f812
c
// command took 3.48s and exited with 0
// ***** Done, took 00:00:05 (5.46 sec) *****
// Found 2 benchmarks:
// Paralleling.ParallelAndSequential: Core(Runtime=Core)
// Paralleling.AllSequential: Core(Runtime=Core)
Setup power plan (GUID: 8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c FriendlyName: High performance)// **************************
// Benchmark: Paralleling.ParallelAndSequential: Core(Runtime=Core)
// *** Execute ***
// Launch: 1 / 1
// Execute: dotnet "e6babe6d-16ff-42cd-aa3e-d457250f812c.dll" --benchmarkName "ConsoleApp.Program+Paralleling.ParallelAndSequential" --job "Core" --benchmarkId 0 in C:\Users\eperret\Desktop\Playground\ConsoleApp\ConsoleApp\ConsoleApp\bin\Re
lease\netcoreapp2.2\e6babe6d-16ff-42cd-aa3e-d457250f812c\bin\Release\netcoreapp2.2
// BeforeAnythingElse
// Benchmark Process Environment Information:
// Runtime=.NET Core 2.2.3 (CoreCLR 4.6.27414.05, CoreFX 4.6.27414.05), 64bit RyuJIT
// GC=Concurrent Workstation
// Job: Core(Runtime=Core)
OverheadJitting 1: 1 op, 595300.00 ns, 595.3000 us/op
WorkloadJitting 1: 1 op, 15646340800.00 ns, 15.6463 s/op
WorkloadWarmup 1: 1 op, 15623246700.00 ns, 15.6232 s/op
WorkloadWarmup 2: 1 op, 15633394200.00 ns, 15.6334 s/op
WorkloadWarmup 3: 1 op, 15621610400.00 ns, 15.6216 s/op
WorkloadWarmup 4: 1 op, 15623904400.00 ns, 15.6239 s/op
WorkloadWarmup 5: 1 op, 15628894600.00 ns, 15.6289 s/op
WorkloadWarmup 6: 1 op, 15619927500.00 ns, 15.6199 s/op
// BeforeActualRun
WorkloadActual 1: 1 op, 15622656400.00 ns, 15.6227 s/op
WorkloadActual 2: 1 op, 15625515000.00 ns, 15.6255 s/op
WorkloadActual 3: 1 op, 15615469600.00 ns, 15.6155 s/op
WorkloadActual 4: 1 op, 15631936300.00 ns, 15.6319 s/op
WorkloadActual 5: 1 op, 15619036800.00 ns, 15.6190 s/op
WorkloadActual 6: 1 op, 15622770800.00 ns, 15.6228 s/op
WorkloadActual 7: 1 op, 15625282100.00 ns, 15.6253 s/op
WorkloadActual 8: 1 op, 15621714600.00 ns, 15.6217 s/op
WorkloadActual 9: 1 op, 15641690200.00 ns, 15.6417 s/op
WorkloadActual 10: 1 op, 15661029200.00 ns, 15.6610 s/op
WorkloadActual 11: 1 op, 15625002000.00 ns, 15.6250 s/op
WorkloadActual 12: 1 op, 15614647200.00 ns, 15.6146 s/op
WorkloadActual 13: 1 op, 15630444900.00 ns, 15.6304 s/op
WorkloadActual 14: 1 op, 15620751600.00 ns, 15.6208 s/op
WorkloadActual 15: 1 op, 15639731400.00 ns, 15.6397 s/op
// AfterActualRun
WorkloadResult 1: 1 op, 15622656400.00 ns, 15.6227 s/op
WorkloadResult 2: 1 op, 15625515000.00 ns, 15.6255 s/op
WorkloadResult 3: 1 op, 15615469600.00 ns, 15.6155 s/op
WorkloadResult 4: 1 op, 15631936300.00 ns, 15.6319 s/op
WorkloadResult 5: 1 op, 15619036800.00 ns, 15.6190 s/op
WorkloadResult 6: 1 op, 15622770800.00 ns, 15.6228 s/op
WorkloadResult 7: 1 op, 15625282100.00 ns, 15.6253 s/op
WorkloadResult 8: 1 op, 15621714600.00 ns, 15.6217 s/op
WorkloadResult 9: 1 op, 15641690200.00 ns, 15.6417 s/op
WorkloadResult 10: 1 op, 15625002000.00 ns, 15.6250 s/op
WorkloadResult 11: 1 op, 15614647200.00 ns, 15.6146 s/op
WorkloadResult 12: 1 op, 15630444900.00 ns, 15.6304 s/op
WorkloadResult 13: 1 op, 15620751600.00 ns, 15.6208 s/op
WorkloadResult 14: 1 op, 15639731400.00 ns, 15.6397 s/op
GC: 0 0 0 0 0
// AfterAll
Mean = 15.6255 s, StdErr = 0.0022 s (0.01%); N = 14, StdDev = 0.0081 s
Min = 15.6146 s, Q1 = 15.6208 s, Median = 15.6239 s, Q3 = 15.6304 s, Max = 15.6417 s
IQR = 0.0097 s, LowerFence = 15.6062 s, UpperFence = 15.6450 s
ConfidenceInterval = [15.6164 s; 15.6346 s] (CI 99.9%), Margin = 0.0091 s (0.06% of Mean)
Skewness = 0.66, Kurtosis = 2.36, MValue = 2
// **************************
// Benchmark: Paralleling.AllSequential: Core(Runtime=Core)
// *** Execute ***
// Launch: 1 / 1
// Execute: dotnet "e6babe6d-16ff-42cd-aa3e-d457250f812c.dll" --benchmarkName "ConsoleApp.Program+Paralleling.AllSequential" --job "Core" --benchmarkId 1 in C:\Users\eperret\Desktop\Playground\ConsoleApp\ConsoleApp\ConsoleApp\bin\Release\ne
tcoreapp2.2\e6babe6d-16ff-42cd-aa3e-d457250f812c\bin\Release\netcoreapp2.2
// BeforeAnythingElse
// Benchmark Process Environment Information:
// Runtime=.NET Core 2.2.3 (CoreCLR 4.6.27414.05, CoreFX 4.6.27414.05), 64bit RyuJIT
// GC=Concurrent Workstation
// Job: Core(Runtime=Core)
OverheadJitting 1: 1 op, 313300.00 ns, 313.3000 us/op
WorkloadJitting 1: 1 op, 15627659000.00 ns, 15.6277 s/op
WorkloadWarmup 1: 1 op, 15618290800.00 ns, 15.6183 s/op
WorkloadWarmup 2: 1 op, 15615060100.00 ns, 15.6151 s/op
WorkloadWarmup 3: 1 op, 15640535400.00 ns, 15.6405 s/op
WorkloadWarmup 4: 1 op, 15627643200.00 ns, 15.6276 s/op
WorkloadWarmup 5: 1 op, 15618477200.00 ns, 15.6185 s/op
WorkloadWarmup 6: 1 op, 15630480200.00 ns, 15.6305 s/op
WorkloadWarmup 7: 1 op, 15618496000.00 ns, 15.6185 s/op
// BeforeActualRun
WorkloadActual 1: 1 op, 15643436500.00 ns, 15.6434 s/op
WorkloadActual 2: 1 op, 15633023800.00 ns, 15.6330 s/op
WorkloadActual 3: 1 op, 15622361000.00 ns, 15.6224 s/op
WorkloadActual 4: 1 op, 15624673600.00 ns, 15.6247 s/op
WorkloadActual 5: 1 op, 15622833000.00 ns, 15.6228 s/op
WorkloadActual 6: 1 op, 15631459600.00 ns, 15.6315 s/op
WorkloadActual 7: 1 op, 15637421400.00 ns, 15.6374 s/op
WorkloadActual 8: 1 op, 15623196600.00 ns, 15.6232 s/op
WorkloadActual 9: 1 op, 15640573100.00 ns, 15.6406 s/op
WorkloadActual 10: 1 op, 15621312000.00 ns, 15.6213 s/op
WorkloadActual 11: 1 op, 15633047100.00 ns, 15.6330 s/op
WorkloadActual 12: 1 op, 15624742400.00 ns, 15.6247 s/op
WorkloadActual 13: 1 op, 15626075700.00 ns, 15.6261 s/op
WorkloadActual 14: 1 op, 15622062500.00 ns, 15.6221 s/op
WorkloadActual 15: 1 op, 15627008400.00 ns, 15.6270 s/op
// AfterActualRun
WorkloadResult 1: 1 op, 15643436500.00 ns, 15.6434 s/op
WorkloadResult 2: 1 op, 15633023800.00 ns, 15.6330 s/op
WorkloadResult 3: 1 op, 15622361000.00 ns, 15.6224 s/op
WorkloadResult 4: 1 op, 15624673600.00 ns, 15.6247 s/op
WorkloadResult 5: 1 op, 15622833000.00 ns, 15.6228 s/op
WorkloadResult 6: 1 op, 15631459600.00 ns, 15.6315 s/op
WorkloadResult 7: 1 op, 15637421400.00 ns, 15.6374 s/op
WorkloadResult 8: 1 op, 15623196600.00 ns, 15.6232 s/op
WorkloadResult 9: 1 op, 15640573100.00 ns, 15.6406 s/op
WorkloadResult 10: 1 op, 15621312000.00 ns, 15.6213 s/op
WorkloadResult 11: 1 op, 15633047100.00 ns, 15.6330 s/op
WorkloadResult 12: 1 op, 15624742400.00 ns, 15.6247 s/op
WorkloadResult 13: 1 op, 15626075700.00 ns, 15.6261 s/op
WorkloadResult 14: 1 op, 15622062500.00 ns, 15.6221 s/op
WorkloadResult 15: 1 op, 15627008400.00 ns, 15.6270 s/op
GC: 0 0 0 0 0
// AfterAll
Mean = 15.6289 s, StdErr = 0.0019 s (0.01%); N = 15, StdDev = 0.0072 s
Min = 15.6213 s, Q1 = 15.6228 s, Median = 15.6261 s, Q3 = 15.6330 s, Max = 15.6434 s
IQR = 0.0102 s, LowerFence = 15.6075 s, UpperFence = 15.6484 s
ConfidenceInterval = [15.6212 s; 15.6366 s] (CI 99.9%), Margin = 0.0077 s (0.05% of Mean)
Skewness = 0.69, Kurtosis = 1.99, MValue = 2
Successfully reverted power plan (GUID: 8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c FriendlyName: High performance)
// ***** BenchmarkRunner: Finish *****
// * Export *
BenchmarkDotNet.Artifacts\results\ConsoleApp.Program.Paralleling-report.csv
BenchmarkDotNet.Artifacts\results\ConsoleApp.Program.Paralleling-report-github.md
BenchmarkDotNet.Artifacts\results\ConsoleApp.Program.Paralleling-report.html
BenchmarkDotNet.Artifacts\results\ConsoleApp.Program.Paralleling-measurements.csv
BuildPlots.R
RPlotExporter couldn't find Rscript.exe in your PATH and no R_HOME environment variable is defined
// * Detailed results *
Paralleling.ParallelAndSequential: Core(Runtime=Core)
Runtime = .NET Core 2.2.3 (CoreCLR 4.6.27414.05, CoreFX 4.6.27414.05), 64bit RyuJIT; GC = Concurrent Workstation
Mean = 15.6255 s, StdErr = 0.0022 s (0.01%); N = 14, StdDev = 0.0081 s
Min = 15.6146 s, Q1 = 15.6208 s, Median = 15.6239 s, Q3 = 15.6304 s, Max = 15.6417 s
IQR = 0.0097 s, LowerFence = 15.6062 s, UpperFence = 15.6450 s
ConfidenceInterval = [15.6164 s; 15.6346 s] (CI 99.9%), Margin = 0.0091 s (0.06% of Mean)
Skewness = 0.66, Kurtosis = 2.36, MValue = 2
-------------------- Histogram --------------------
[15.612 s ; 15.645 s) | @@@@@@@@@@@@@@
---------------------------------------------------
Paralleling.AllSequential: Core(Runtime=Core)
Runtime = .NET Core 2.2.3 (CoreCLR 4.6.27414.05, CoreFX 4.6.27414.05), 64bit RyuJIT; GC = Concurrent Workstation
Mean = 15.6289 s, StdErr = 0.0019 s (0.01%); N = 15, StdDev = 0.0072 s
Min = 15.6213 s, Q1 = 15.6228 s, Median = 15.6261 s, Q3 = 15.6330 s, Max = 15.6434 s
IQR = 0.0102 s, LowerFence = 15.6075 s, UpperFence = 15.6484 s
ConfidenceInterval = [15.6212 s; 15.6366 s] (CI 99.9%), Margin = 0.0077 s (0.05% of Mean)
Skewness = 0.69, Kurtosis = 1.99, MValue = 2
-------------------- Histogram --------------------
[15.619 s ; 15.646 s) | @@@@@@@@@@@@@@@
---------------------------------------------------
// * Summary *
BenchmarkDotNet=v0.11.5, OS=Windows 10.0.17134.407 (1803/April2018Update/Redstone4)
Intel Core i7-7820HQ CPU 2.90GHz (Kaby Lake), 1 CPU, 8 logical and 4 physical cores
.NET Core SDK=2.2.202
[Host] : .NET Core 2.2.3 (CoreCLR 4.6.27414.05, CoreFX 4.6.27414.05), 64bit RyuJIT
Core : .NET Core 2.2.3 (CoreCLR 4.6.27414.05, CoreFX 4.6.27414.05), 64bit RyuJIT
Job=Core Runtime=Core
| Method | Mean | Error | StdDev | Rank |
|---------------------- |--------:|---------:|---------:|-----:|
| ParallelAndSequential | 15.63 s | 0.0091 s | 0.0081 s | 1 |
| AllSequential | 15.63 s | 0.0077 s | 0.0072 s | 1 |
// * Hints *
Outliers
Paralleling.ParallelAndSequential: Core -> 1 outlier was removed (15.66 s)
// * Legends *
Mean : Arithmetic mean of all measurements
Error : Half of 99.9% confidence interval
StdDev : Standard deviation of all measurements
Rank : Relative position of current benchmark mean among all benchmarks (Arabic style)
1 s : 1 Second (1 sec)
// ***** BenchmarkRunner: End *****
// ** Remained 0 benchmark(s) to run **
Run time: 00:11:44 (704.57 sec), executed benchmarks: 2
Global total time: 00:11:50 (710.04 sec), executed benchmarks: 2
// * Artifacts cleanup *
Process finished with exit code 0.
I don't really get why the Parallel.ForEach
is not faster than the whole sequential strategy, is it because the BlockingCollection
is the actual bottleneck?
Is there another strategy to better leverage Parallel.ForEach
?