0

In C#, I am using Task Parallel Library (TPL) to download an image, process the image, and save the analysis results. A simplified code reads as the following.

var getImage = new TransformBlock<int, Image>(GetImage);
var proImage = new TransformBlock<Image, double>(ProcessImage);
var saveRes = new ActionBlock<double>(SaveResult);

var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };

getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);

for (int x = 0; x < 1000000; x++)
    getImage.Post(x);
getImage.Complete();

SaveRes.Completion.Wait();

This works as expected, except for memory usage. I am expecting int_x, image_x, and double_x to be disposed when the pipeline has processed that iteration. In other words, I am expecting every resource created during the execution of getImage, proImage, and saveRes for iteration x be disposed when the last block completes its execution. However, this implementation keeps all the objects in the memory until I exit the scope of TPL.

Am I missing something? is this the expected behavior of TPL? and is there any option to set so the resources are released at the end of each iteration?

Update

Following the suggestion in the comments, I rewrote the code using BufferBlock and SendAsync as the following. However, I do not think it leads to claiming the resources consumed by each task. Setting the BoundedCapacity only causes my program to halt at a point where I believe it has reached the limit set to the BoundedCapacity.

var blockOpts = new DataflowBlockOptions()
{ BoundedCapacity = 100 };

var imgBuffer = new BufferBlock<int>(blockOpts);

var getImage = new TransformBlock<int, Image>(GetImage, blockOpts);
var proImage = new TransformBlock<Image, double>(ProcessImage, blockOpts);
var SaveRes = new ActionBlock<double>(SaveResult, blockOpts);

var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };

imgBuffer.LinkTo(getImage, linkOptions);
getImage.LinkTo(proImage, linkOptions);
proImage.LinkTo(SaveRes, linkOptions);

for (int x = 0; x < 1000000; x++)
    await imgBuffer.SendAsync(x);
getImage.Complete();

SaveRes.Completion.Wait();
Dr. Strangelove
  • 2,725
  • 3
  • 34
  • 61
  • Have you tried configuring the dataflow blocks with a reasonable [`BoundedCapacity`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.boundedcapacity)? Here is a related question: [TPL Dataflow - very fast producer, not so fast consumers OutOfMemory exception](https://stackoverflow.com/questions/40340274/tpl-dataflow-very-fast-producer-not-so-fast-consumers-outofmemory-exception). – Theodor Zoulias Jan 30 '22 at 23:51
  • In case you use the `BoundedCapacity` configuration, and the `ProcessImage`/`SaveResult` methods can throw exceptions, you are advised to take a look at this question: [TPL Dataflow exception in transform block with bounded capacity](https://stackoverflow.com/questions/21603428/tpl-dataflow-exception-in-transform-block-with-bounded-capacity). – Theodor Zoulias Jan 30 '22 at 23:53
  • Maybe I am misreading something; the `Image` type implements the `IDispose` interface, and the `Dispose` method is never called even when setting the `BoundedCapacity` to a very small value (e.g., `10`). – Dr. Strangelove Jan 30 '22 at 23:59
  • The `ProcessImage` method seems like a good place to `Dispose` manually the image. I don't think that it's possible to be disposed automatically. – Theodor Zoulias Jan 31 '22 at 00:05
  • Am I correct in assuming that since the `Dispose` method is not called, the object is not "correctly" disposed? – Dr. Strangelove Jan 31 '22 at 00:14
  • 1
    Hamed yes, if you omit disposing an `IDisposable` object, the object will not be disposed. The resources it holds will not be released until it is finalized by the garbage collector (assuming that all references to the object have been dropped, so that the object can be recycled). – Theodor Zoulias Jan 31 '22 at 00:19
  • Thanks, then I guess setting `BoundedCapacity` does not mean the items beyond the set limit, are "disposed", i.e., their resources are not claim by GC, so I'm not sure how that can help limit memory usage?! – Dr. Strangelove Jan 31 '22 at 00:52
  • Hamed without configuring the `BoundedCapacity`, in case the `ProcessImage` takes longer than the `GetImage`, you may end up with hundreds of thousands of `Image` objects stored in the input buffer of the `proImage` block. – Theodor Zoulias Jan 31 '22 at 01:10
  • Thank you, then I guess it prevents adding new items before the set limit of the previous items is processed, though not sure what happens to the processed items? it does not seem their resources are claimed. Additionally, after the `k` (the `BoundedCapacity` limit) number of items are processed, no new `x` is passed to `getImage`. So, buffer/queue of processed items is not free-ed up for new items?! maybe I am missing something. – Dr. Strangelove Jan 31 '22 at 01:55
  • Using the `BoundedCapacity` configuration introduces what is known as "backpressure". This means that the producer that feeds the dataflow pipeline with messages, now may have to wait while the pipeline is at its maximum capacity. In practice what you have to do is to put aside the `Post` method, and use the [`SendAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblock.sendasync) instead. You can look at the links I posted earlier for examples. – Theodor Zoulias Jan 31 '22 at 02:15
  • Thanks for the suggestion, I changed accordingly, please see the updated questions. Not sure what I'm missing here, but it seems the pipeline does not detect the input items have made through the pipeline, because not only it does not free up memory, it also halts after processing certain items (that I believe it is waiting for the buffer to empty). – Dr. Strangelove Jan 31 '22 at 06:58
  • Could you try `imgBuffer.Complete()` instead of `getImage.Complete()`? Also make sure to log any errors inside the `GetImage`, `ProcessImage` and `SaveResult`. In case of an error, [the pipeline will deadlock](https://stackoverflow.com/questions/21603428/tpl-dataflow-exception-in-transform-block-with-bounded-capacity). – Theodor Zoulias Jan 31 '22 at 07:23
  • I changed to `imgBuffer.Complete()`, I add `try-catch` for all the methods and add logging. No exception is thrown. Depending on the value given for the `BoundedCapacity` it always gets stuck at a different item. – Dr. Strangelove Jan 31 '22 at 08:30
  • Hmm, it's not obvious why the pipeline would get stuck. It could be something related with the `Image` objects: they might be thread-affine, and manipulating an `Image` from two different threads might not be allowed. In that case you could consider passing byte arrays (`byte[]`) instead of images from block to block. Otherwise, we would need to see a minimal reproducible example, before being able to provide further help. – Theodor Zoulias Jan 31 '22 at 08:40
  • I guess I've narrowed the issue, it is related to a limited number of handles I've (concurrent open database connections to be exact), since the objects are not auto-disposed, the handles are not released, disposing manually fixes the issue. So, that takes me to my first question, how can I can make sure dispose is **auto** called when the last block/action executed on an input? – Dr. Strangelove Jan 31 '22 at 08:41
  • 1
    Honestly I am not a fan of passing `IDisposable` objects as messages in a dataflow pipeline. You can easily guarantee that a disposable object is disposed inside the method which created it, by using the `using` statement or a `try`/`finally` block. But if you let it escape outside, the responsibility for its disposal is no longer localized, and ensuring that it will be finally disposed becomes tricky. I have no easy solution to suggest to this problem. – Theodor Zoulias Jan 31 '22 at 08:49
  • 1
    That is a good suggestion, thank you! – Dr. Strangelove Jan 31 '22 at 08:54

1 Answers1

1

is this the expected behavior of TPL?

Yes. It doesn't root all the objects (they are available for garbage collection and finalization), but it does not dispose them, either.

and is there any option to set so the resources are released at the end of each iteration?

No.

how can I can make sure dispose is auto called when the last block/action executed on an input?

To dispose objects, your code should call Dispose. This is fairly easily done by modifying ProcessImage or wrapping it in a delegate.

If ProcessImage is synchronous:

var proImage = new TransformBlock<Image, double>(image => { using (image) return ProcessImage(image); });

or if it's asynchronous:

var proImage = new TransformBlock<Image, double>(async image => { using (image) return await ProcessImage(image); });
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 2
    The problem is that in case the `proImage` block fails, all `Image` objects stored in its input buffer are going to be silently dropped. These objects will not get disposed, and the managed resources they hold are not going to be released before the objects are finalized by the garbage collector (at some random moment in the future). – Theodor Zoulias Jan 31 '22 at 14:22
  • 1
    Good point. My answer does not handle error situations well. I suppose one could add them to a collection disposable, but then you'd end up with a potentially ever growing list... – Stephen Cleary Jan 31 '22 at 15:51
  • Another idea would be to make the blocks bulletproof, by wrapping the messages in some yet to be invented `Try`+`Using` monad, something like the [`Try`](https://github.com/StephenCleary/Try) on steroids. :-) – Theodor Zoulias Jan 31 '22 at 16:11
  • I'm a bit tired ATM - I can't seem to figure out how that would work. If the using monad input messages are dropped, they still wouldn't be eagerly disposed, right? – Stephen Cleary Jan 31 '22 at 20:37
  • Also, I am not sure how I should error out? e.g., how do you call `Fault` from the block body? – Dr. Strangelove Jan 31 '22 at 20:42
  • Stephen yes, the idea is that the pipeline would be fail-safe, so no messages would be dropped. Every `TryUsing` message that would go in the pipeline, it would travel all the way to the exit point. This means that the pipeline would not be cancelable though, which is a problem. – Theodor Zoulias Feb 01 '22 at 04:08
  • 1
    @TheodorZoulias: Ah, of course. The Try monad would prevent faulting blocks completely. – Stephen Cleary Feb 01 '22 at 06:37
  • @Hamed: A block will fault if the block body throws an exception. – Stephen Cleary Feb 01 '22 at 06:38
  • That is what I tried, but in my tests, that puts the block, and, consequently, the whole pipeline in a halt. – Dr. Strangelove Feb 01 '22 at 16:40