14

I need to do this kind of work:

  1. Get Page object from database
  2. For each page get all images and process them (IO bound, for example, upload to CDN)
  3. If all images proceeded successfully then mark Page as processed in database

Since I need to control how much Pages I process in parallel I've decided to go with TPL Dataflows:

 ____________________________
|         Data pipe          |
|   BufferBlock<Page>        |
|   BoundedCapacity = 1      |
|____________________________|
              |
 ____________________________
|       Process images       |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 8 |
|____________________________|
              |
 ____________________________
|        Save page           |
| ActionBlock<Page>          |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 5 |
|____________________________|

Now I need the "Process images" to process images in parallel but I want to limit how much images I've processing across all parallel pages in work currently.

I can use TrasnformManyBlock for "Process images" but how do I gather them back in "Save page" block?

         ____________________________
        |         Data pipe          |
        |   BufferBlock<Page>        |
        |   BoundedCapacity = 1      |
        |____________________________|
                      |
     ___________________________________
    |           Load images             |
    | TransformManyBlock<Page, Image[]> |
    | BoundedCapacity = 1               |
    | MaxDegreeOfParallelism = 8        |
    |___________________________________|
      /              |              \
   ______________________________________________
 _|____________________________________________  |
|              Process image                   | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1                          | |
| MaxDegreeOfParallelism = 8                   |_|
|______________________________________________|
      \              |               /
         How to group images by page ?
                     |
        ____________________________
       |        Save page           |
       | ActionBlock<Page>          |
       | BoundedCapacity = 1        |
       | MaxDegreeOfParallelism = 5 |
       |____________________________|

On top of that potentially one of the images could fail to be proceed and I don't want to save page with failed images.

svick
  • 236,525
  • 50
  • 385
  • 514
Michael Logutov
  • 2,551
  • 4
  • 28
  • 32
  • What's the problem exactly? According to the diagram you already figured it out. Set MaxDegreeOfParallelism in the middle step to the desired level. – usr Oct 25 '14 at 10:12
  • I've edited the question with the diagram of using TransformManyBlock. – Michael Logutov Oct 25 '14 at 10:45
  • As a side note, configuring a block with `BoundedCapacity` smaller than the `MaxDegreeOfParallelism` will reduce the degree of parallelism to the value of the capacity. In other words, the block cannot process 8 images simultaneously if it is allowed to buffer only one. – Theodor Zoulias Nov 30 '20 at 20:46

3 Answers3

7

You can group the images together by recording whenever an image for a given page arrives and then sending the page on when all images arrived. To figure that out, page needs to know how many images it contains, but I assume you know that.

In code, it could look something like this:

public static IPropagatorBlock<TSplit, TMerged>
    CreaterMergerBlock<TSplit, TMerged>(
    Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
    var dictionary = new Dictionary<TMerged, int>();

    return new TransformManyBlock<TSplit, TMerged>(
        split =>
        {
            var merged = getMergedFunc(split);
            int count;
            dictionary.TryGetValue(merged, out count);
            count++;
            if (getSplitCount(merged) == count)
            {
                dictionary.Remove(merged);
                return new[] { merged };
            }

            dictionary[merged] = count;
            return new TMerged[0];
        });
}

Usage:

var dataPipe = new BufferBlock<Page>();

var splitter = new TransformManyBlock<Page, ImageWithPage>(
    page => page.LoadImages(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
    image =>
    {
        // process the image here
        return image;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var merger = CreaterMergerBlock(
    (ImageWithPage image) => image.Page, page => page.ImageCount);

var savePage = new ActionBlock<Page>(
    page => /* save the page here */,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
svick
  • 236,525
  • 50
  • 385
  • 514
  • Thanks. That's quite similar to what I've been thinking already and uses Dataflow. But shouldn't you use ConcurrentDictionary? – Michael Logutov Oct 27 '14 at 06:34
  • 1
    @MichaelLogutov `ConcurrentDictionary` is not necessary here, since the `TransformManyBlock` has `MaxDegreeOfParallelism` set to 1. That means the `Dictionary` will never be accessed from more than one thread at any given time. – svick Oct 27 '14 at 08:36
  • Right. Forgot about default options set DOP to 1. Thanks. – Michael Logutov Oct 27 '14 at 13:15
  • @svick I know it has been a long time, but could you please provide some explanation to how the merging works? Also see [my question](https://stackoverflow.com/q/58714155/4649258). – Little geek Nov 07 '19 at 12:49
  • @svick love that! Thank you! – Gleb Sep 16 '21 at 21:21
0

The .NET platform has a nice interface that can represent parent-child relationships, the IGrouping<TKey, TElement> interface. It is simply an IEnumerable that also has a Key property. The key can be anything, and in this case in could be the Page that needs to be processed. The contents of the grouping could be the Images that belong to each page, and need to be uploaded. This leads to the idea of a dataflow block that can process IGrouping<TKey, TInput> objects, by processing independently each TInput, then aggegate the results per grouping, and finally output them as IGrouping<TKey, TOutput> objects. Below is an implementation of this idea:

public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();

    var actionBlock = new ActionBlock<Task<Task<TOutput>>>(taskTask =>
    {
        // An exception thrown by the following line would cause buggy behavior.
        // According to the documentation it should never fail.
        taskTask.RunSynchronously();
        return taskTask.Unwrap();
    }, dataflowBlockOptions);

    var completionCTS = new CancellationTokenSource();
    _ = actionBlock.Completion
        .ContinueWith(_ => completionCTS.Cancel(), TaskScheduler.Default);

    var transformBlock = new TransformBlock<IGrouping<TKey, TInput>,
        IGrouping<TKey, TOutput>>(async grouping =>
    {
        if (grouping == null) throw new InvalidOperationException("Null grouping.");
        var tasks = new List<Task<TOutput>>();
        foreach (var item in grouping)
        {
            // Create a cold task that will be either executed by the actionBlock,
            // or will be canceled by the completionCTS. This should eliminate
            // any possibility that an awaited task will remain cold forever.
            var taskTask = new Task<Task<TOutput>>(() => transform(grouping.Key, item),
                completionCTS.Token);
            var accepted = await actionBlock.SendAsync(taskTask);
            if (!accepted)
            {
                // The actionBlock has failed.
                // Skip the rest of the items. Pending tasks should still be awaited.
                tasks.Add(Task.FromCanceled<TOutput>(new CancellationToken(true)));
                break;
            }
            tasks.Add(taskTask.Unwrap());
        }
        TOutput[] results = await Task.WhenAll(tasks);
        return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
    }, dataflowBlockOptions);

    // Cleanup
    _ = transformBlock.Completion
        .ContinueWith(_ => actionBlock.Complete(), TaskScheduler.Default);
    _ = Task.WhenAll(actionBlock.Completion, transformBlock.Completion)
        .ContinueWith(_ => completionCTS.Dispose(), TaskScheduler.Default);

    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, TOutput> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    return CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        (key, item) => Task.FromResult(transform(key, item)), dataflowBlockOptions);
}

This implementation consists of two blocks, a TransformBlock that processes the groupings and an internal ActionBlock that processes the individual items. Both are configured with the same user-supplied options. The TransformBlock sends to the ActionBlock the items to be processed one by one, then waits for the results, and finally constructs the output IGrouping<TKey, TOutput> with the following tricky line:

return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping

This compensates for the fact that currently there is no publicly available class that implements the IGrouping interface, in the .NET platform. The GroupBy+Single combo does the trick, but it has the limitation that it doesn't allow the creation of empty IGroupings. In case this is an issue, creating a class that implements this interface is always an option. Implementing one is quite straightforward (here is an example).

Usage example of the CreateTransformGroupingBlock method:

var processPages = new TransformBlock<Page, IGrouping<Page, Image>>(page =>
{
    Image[] images = GetImagesFromDB(page);
    return images.GroupBy(_ => page).Single(); // Convert to IGrouping
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var uploadImages = CreateTransformGroupingBlock<Page, Image, Image>(async (page, image) =>
{
    await UploadImage(image);
    return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var savePages = new ActionBlock<IGrouping<Page, Image>>(grouping =>
{
    var page = grouping.Key;
    foreach (var image in grouping) SaveImageToDB(image, page);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

processPages.LinkTo(uploadImages);
uploadImages.LinkTo(savePages);

The type of the uploadImages variable is TransformBlock<IGrouping<Page, Image>, IGrouping<Page, Image>>. In this example the types TInput and TOutput are the same, because the images need not to be transformed.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
-1

Consider merging "Load images" and "Process images" into one TransformBlock block. That way you have no trouble keeping the images of a single page together.

In order to achieve your concurrency limit goal, use a SemaphoreSlim:

SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);

//...

var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
var images = GetImages(page);
ImageWithPage[] processedImages =
 images
 .AsParallel()
 .Select(i => {
    processImageDopLimiter.WaitOne();
    var result = ProcessImage(i);
    processImageDopLimiter.ReleaseOne();
    return result;
 })
 .ToList();
return new { page, processedImages };

This will lead to quite a few threads blocked waiting. You can use an asynchronous version of this processing if you like. This is immaterial to the question.

usr
  • 168,620
  • 35
  • 240
  • 369
  • 1
    I was hoped for more Dataflow-way of doing things because I think that artificially limiting DOP by blocking task is like cheating Dataflow which have the main goal to orchestrate over work load. Could this lead to Dataflow not correctly tuning itself to work load? – Michael Logutov Oct 25 '14 at 11:07
  • 1
    Good questions. Dataflow does not tune anything. It maxes out the DOP you specify. Just like our custom solution does. Note, that you can say `await semaphoreSlim.WaitOneAsync()` and you'll block zero threads while waiting. Internally, the semaphore has a queue and waiters are put in it.; I can see no fundamental difference between limiting the DOP "manually" or by using Dataflow. Do you see any concrete disadvantage?; Actually, you don't need Dataflow here at all because semaphores can do all of this. But it kind of makes sense to use Dataflow. – usr Oct 25 '14 at 14:12
  • Correct me if I'm wrong, but I think Dataflow already has the same kind of mechanism to maxing out DOP with blocks. So implementing another limiter meaning that we fighting the Dataflow framework. And as you've mentioned - I can't use this solution with workflow because even with SemaphoreSlim I still can't use TransformManyBlock because I still haven't figured out how to group images back by their page. – Michael Logutov Oct 25 '14 at 17:41
  • Right. Edited the code so that the page and the images stay grouped. What exactly is wrong with the parallelism behavior here? We are getting 8 concurrent threads for GetImages (because the transform block does that limiting). We are getting 8 concurrent threads for ProcessImage. That's exactly like your example is configured. – usr Oct 25 '14 at 17:50
  • So, why TransformManyBlock then? It get Page as input and processing many { page, image[] } results from each page? – Michael Logutov Oct 25 '14 at 18:35
  • I recommend a TransformBlock. My previous edit was incomplete. Fixed.; We are making progress only slowly. Are there any concerns remaining that prevent you from adopting this solution? – usr Oct 25 '14 at 18:53
  • Well, I guess it's the only solution currently. Another one is with custom scheduler which (I think) will be similar in it's core. – Michael Logutov Oct 25 '14 at 19:56
  • Yes, exactly. I wouldn't go that route because it requires you to manage your own threads. Remember, that with the async semaphore you don't block threads either – usr Oct 25 '14 at 20:02
  • Tbh I'm not very found of your solution - lets take 10 parallel pages each has 10 images and 10 DOP for images processing. With your solution we've end with 10+100 threads in thread pool only 20 of them actually working. We should not spend thread if we know we can't do work in it yet. Thats why I think scheduler solution is better. But I'm yet to see good implementation of DOP-limited scheduler (the LimitedConcurrencyLevelTaskScheduler from MS TPL extras using locks and other nasty stuff in it). – Michael Logutov Oct 26 '14 at 06:37
  • Then use async semaphore waiting. You don't block threads with that while waiting. I have proposed that three times now. – usr Oct 26 '14 at 07:42
  • I've never talked about Thread blocking. We're not working with Thread class directly - why even you're talking about it? Of course I know that we ain't blocking thread - it's a given when working with Tasks unless you go back to Thead API. I'm talking about wasting slots in ThreadPool via tasks. You've creating more Task classes than needed. That's the problem with your solution. – Michael Logutov Oct 26 '14 at 09:33
  • When you await a semaphore you don't block a "threadpool slot". Maybe you should find out what await does. It is not a keyword to simply call Wait. My solution, if converted to async (which the sample code is not), uses just as many threads/tasks as dataflow. – usr Oct 26 '14 at 09:39
  • Correct me if I'm wrong but I think AsParallel using Partitioner to create Task objects for partition. And since we're not limiting it with anything (like DOP) it will create as much Task objects as elements in source data stream. – Michael Logutov Oct 26 '14 at 11:35
  • That's why I say: convert this idea to an async approach. Such as manually creating one task per image found on the page (potentially 100s). Most tasks will immediately enqueue themselves into the semaphore and return. It's the same idea that the sync code (as posted here) uses, just made async. – usr Oct 26 '14 at 11:42
  • The code posted here is not meant for production. This is an illustration of an idea. – usr Oct 26 '14 at 11:53