2

I have a simple parallel loop doing stuff, and afterwards I save the results to a file.

object[] items; // array with all items
object[] resultArray = new object[numItems];
Parallel.For(0, numItems, (i) => 
{ 
    object res = doStuff(items[i], i);
    resultArray[i] = res;
});

foreach (object res in resultArray)
{
    sequentiallySaveResult(res);
}

For the saving, I need to write the results in the correct sequential order. By putting the results in the resultArray, the order of the results is correct again.

However, as the results are pretty big and take a lot of memory. I would like to process the items in-order, as in e.g. four threads start and work on items 1-4, next free thread takes item 5 and so on.

With that, I could start another Thread, monitoring the item that needs to be written next in the array (or each thread could emit an event when an item is finished), so I can already start writing the first results while the later items are still being processed and then free the memory.

Is it possible for Parallel.For to process the items in the given order? I of course I could use a concurentQueue, put all the indices in the right order in there and start threads manually.

But if possible, I would like to keep all the automations on how many threads to use etc. that are in the ´Parallel.For´ implementation.

Disclaimer: I cannot switch to an ForEach, I need the i.

EDIT #1 :
Currently, the execution order is totally random, one example:

Processing item 1/255
Processing item 63/255
Processing item 32/255
Processing item 125/255
Processing item 94/255
Processing item 156/255
Processing item 187/255
Processing item 249/255
...

EDIT #2 :
More details to the job that is done:

I process a grayscale image and need to extract information for each "layer" (items in the example above), so I go from 0 to 255 (for 8bit) and perform a task on the image.

I have a class to access the pixel values concurrently:

 unsafe class UnsafeBitmap : IDisposable
    {

        private BitmapData bitmapData;
        private Bitmap gray;
        private int bytesPerPixel;
        private int heightInPixels;
        private int widthInBytes;
        private byte* ptrFirstPixel;

        public void PrepareGrayscaleBitmap(Bitmap bitmap, bool invert)
        {
            gray = MakeGrayscale(bitmap, invert);

            bitmapData = gray.LockBits(new Rectangle(0, 0, gray.Width, gray.Height), ImageLockMode.ReadOnly, gray.PixelFormat);
            bytesPerPixel = System.Drawing.Bitmap.GetPixelFormatSize(gray.PixelFormat) / 8;
            heightInPixels = bitmapData.Height;
            widthInBytes = bitmapData.Width * bytesPerPixel;
            ptrFirstPixel = (byte*)bitmapData.Scan0;
        }

        public byte GetPixelValue(int x, int y)
        {
            return (ptrFirstPixel + ((heightInPixels - y - 1) * bitmapData.Stride))[x * bytesPerPixel];
        }

        public void Dispose()
        {
            gray.UnlockBits(bitmapData);
        }
    }

And the loop is

UnsafeBitmap ubmp; // initialized, has the correct bitmap
int numLayers = 255;
int bitmapWidthPx = 10000;
int bitmapHeightPx = 10000;
object[] resultArray = new object[numLayer];
Parallel.For(0, numLayers, (i) => 
{ 
        for (int x = 0; x < bitmapWidthPx ; x++)
    {
        inLine = false;
        for (int y = 0; y < bitmapHeightPx ; y++)
        {
            byte pixel_value = ubmp.GetPixelValue(x, y);
            
            if (i <= pixel_value && !inLine)
            {
                result.AddStart(x,y);
                inLine = true;
            }
            else if ((i > pixel_value || y == Height - 1) && inLine)
            {
                result.AddEnd(x, y-1);
                inLine = false;
            }
        }
    }
    result_array[i] = result;
});

foreach (object res in resultArray)
{
    sequentiallySaveResult(res);
}

And I would like to also start a thread for the saving, checking if the item that needs to be written next is available, write it, discard from memory. And for this, it would be good if the processing starts in order, so that the result arrive roughly in order. If the result for layer 5 arrives second to last, I have to wait writing layer 5 (and all following) until the end.

If 4 threads start, start processing layers 1-4, and when a thread is done, starts processing layer 5, next one layer 6 and so on, the results will come more or less in the same order and I can start writing result to the file and discarding them from memory.

user3696412
  • 1,321
  • 3
  • 15
  • 33
  • 1
    You can switch to `Parallel.ForEach` and use the overload that provides the index. Besides, you shouldn't need the index if you already have all the appropriate items. Ordering is extra work, which is why neither Parallel.For, Foreach or PLINQ produce ordered results. You can request ordered results by adding [AsOrdered](https://learn.microsoft.com/en-us/dotnet/api/system.linq.parallelenumerable.asordered?view=netcore-3.1) to a PLINQ query – Panagiotis Kanavos Jun 30 '20 at 11:45
  • You don't need and *shouldn't* try to use your own threads either. For starters, PLINQ and Parallel use all available cores. They also handle partitioning of the data, load balancing, batching etc which is far more efficient than trying to do the same stuff on your own – Panagiotis Kanavos Jun 30 '20 at 11:46
  • As written, I do not even need to get the results perfectly ordered. It should just start the processing in the given order, so the results arrive about in the correct order, so I can start saving them already. If the result for item 2 arrives first, and the item 1 followes, it is alright, I can handle this in the writer, it will wait on the first item an then write in the correct order. – user3696412 Jun 30 '20 at 11:49
  • It won't. When you use parallel processing the results will come in whatever order the CPUs produce them. If one of the worker tasks works faster than the others, perhaps because its job was easier, it will produce more results than others. That's why the results have to be ordered again. Are you sure you need *parallelism* though? Parallelism means processing a lot of *in-memory only* data. If you need to load files or execute multiple steps, you're probably looking for pipeline processing – Panagiotis Kanavos Jun 30 '20 at 11:52
  • Each task takes pretty exaclty the same time. But even if on tasks works faster, it's fine. So lets say 4 threads start, start working on items 1-4. Thread 2 finishes first, starts working on item 5 and so on. So the results will roughly come out in the same correct order, so I can start writing them sequentially. But if item 5 comes only second to last, I have to wait until everthing is finished until I can write item 5 and all following. – user3696412 Jun 30 '20 at 11:56
  • And yeah, parallelism is fine for this. I will add a bit more information on the task. – user3696412 Jun 30 '20 at 11:57
  • If they did, you wouldn't have to post a question. In any case, if you want to avoid generating the result array in advance, use PLINQ and iterate over the results, don't store them in an array. That's stream processing, not ordering. `var query=items.AsParallel().Select((item, idx)=>doStuff(item, idx))` is enough to process the data in parallel. When you iterate over the query with `foreach(var item in query)` the PLINQ query will execute and produce the results – Panagiotis Kanavos Jun 30 '20 at 12:01
  • The order isn't random either. The data will be partitioned, and each worker task will receive one partition and work through it as fast as possible, in order. That's the *partition's* order though. What you posted is all partition workers emitting logs – Panagiotis Kanavos Jun 30 '20 at 12:07
  • You can prevent partitioning by feeding an IEnumerable<> instead of a List or array, or by supplying settings, but that will increase the sync overhead and won't guarantee the order in the end. Try with `AsOrdered()` first. It may be fast enough for what you want. It *doesn't* use full buffering or ordering, the way an `OrderBy()` would – Panagiotis Kanavos Jun 30 '20 at 12:12
  • From the [ParallelMergeOptions](https://learn.microsoft.com/en-us/dotnet/api/system.linq.parallelmergeoptions?view=netcore-3.1) docs `However, queries that are created by using the AsOrdered operator can be streamed as long as no further sorting is performed within the query itself.` – Panagiotis Kanavos Jun 30 '20 at 12:14
  • I added some details on the work being done in the question. I especially do not see how the PLINQ request would allow me to start writing the first results to file while the rest is still being processed. – user3696412 Jun 30 '20 at 12:20
  • Ah, I somehow missed the streaming, ok, I see how I can save while the processing is still running. – user3696412 Jun 30 '20 at 12:27
  • Bitmap? What bitmap? .NET/GDI bitmaps provide thread safety by *locking*, which means accessing individual pixels is expensive. You can avoid this by using LockBits and accessing the data directly, [as shown in this question](https://stackoverflow.com/questions/7433508/c-sharp-bitmap-image-masking-using-unsafe-code). Image processing can benefit greatly form the SIMD operations in [System.Numeric.Vector](https://docs.microsoft.com/en-us/dotnet/api/system.numerics.vector-1?view=netcore-3.1) and [Vector](https://docs.microsoft.com/en-us/dotnet/api/system.numerics.vector?view=netcore-3.1). – Panagiotis Kanavos Jun 30 '20 at 12:40
  • In fact, SIMD could remove the need for parallelism altogether, or allow you to break the bitmap in broad sections and use SIMD on each of those sections. Some of the classes in the Numerics namespace exist to accelerate 2D and 3D operations, eg `Matrix3x2` and `Matrix4x4` – Panagiotis Kanavos Jun 30 '20 at 12:42
  • What are you trying to do? My computational geometry days are 16 years in the past but I do remember that almost everything is matrix multiplications, even complex convolution calculations. If you can describe your algorithm as matrix multiplications you should be able to use SIMD to accelerate this *a lot*. Far more than simple parallelism, where each CPU is trying to load data from RAM, wasting time to look into its cache. You can't use advanced SIMD operations through Vector, but .NET Core 3 added [SIMD intrinsics](https://devblogs.microsoft.com/dotnet/hardware-intrinsics-in-net-core/) – Panagiotis Kanavos Jun 30 '20 at 12:51
  • I added some more details above. Basically, I am vectorizing the bitmap line by line so I for coherent line of pixels, I need the start end end coordinates (I cannot go into details what the application is or why it is done line by line - it just needs to be done this way). Regarding the locking, I am doing this with the UnsafeBitmap Class I wrote myself. My predecessor used Bitmap.GetPixel(), my current version ist already two orders of magnitude faster just by using the locking and direct access. But if this can be expressed through SIMD, I am eager to learn. – user3696412 Jun 30 '20 at 14:50

2 Answers2

3

The Parallel class knows how to parallelize a workload, but doesn't know how to merge the processed results. So I would suggest to use PLINQ instead. Your requirement of saving the results in the original order and concurrently with the processing, makes it a bit trickier than usual, but it is still perfectly doable:

IEnumerable<object> results = Partitioner
    .Create(items, EnumerablePartitionerOptions.NoBuffering)
    .AsParallel()
    .AsOrdered()
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select((item, index) => DoStuff(item, index))
    .AsEnumerable();

foreach (object result in results)
{
    SequentiallySaveResult(result);
}

Explanation:

  1. The AsOrdered operator is required for retrieving the results in the original order.
  2. The WithMergeOptions operator is required for preventing the buffering of the results, so that they are saved as soon as they become available.
  3. The Partitioner.Create is required because the source of data is an array, and PLINQ by default partitions arrays statically. Which means that the array is split in ranges, and one thread is allocated for processing each range. Which is a great performance optimization in general, but in this case it defeats the purpose of the timely and ordered retrieval of the results. So a dynamic partitioner is needed, to enumerate the source sequentially from start to end.
  4. The EnumerablePartitionerOptions.NoBuffering configuration prevents the worker threads employed by PLINQ from grabing more than one item at a time (which is the default PLINQ partitioning cleverness known as "chunk partitioning").
  5. The AsEnumerable is not really needed. It is there just for signifying the end of the parallel processing. The foreach that follows treats the ParallelQuery<object> as IEnumerable<object> anyway.

Because of all of this trickery required, and because this solution is not really flexible enough in case you need later to add more concurrent heterogeneous steps in the processing pipeline, I would suggest to keep in mind the option of stepping up to the TPL Dataflow library. It is a library that unlocks lots of powerful options in the realm of parallel processing.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • A more advanced approach for creating processing pipelines using PLINQ is presented [here](https://stackoverflow.com/questions/62035864/design-help-for-parallel-processing-azure-blob-and-bulk-copy-to-sql-database-c/62041200#62041200). – Theodor Zoulias Sep 10 '21 at 01:29
0

Well if you want to order thread operations, Thread Synchronization 101 teaches us to use condition variables, and to implement those in C# tasks you can use a SemaphoreSlim which provides an async wait function, SemaphoreSlim.WaitAsync. That plus a counter check will get you the desired result.

However I'm not convinced it's needed, because if I understand correctly and you just want to save them sequentially to avoid storing them in memory, you can use memory mapped files to either:

  1. If the results have the same size, simply write your buffer at the location index * size.

  2. If the results have different sizes, write to a temporary mapped file as you get your results, and have another thread copy the correct sequential output file as they come. This is an IO bound operation, so don't use the task pool for it.

Blindy
  • 65,249
  • 10
  • 91
  • 131