2

Here is a simplified scenario - user wants to download and process some data:

private ConcurrentDictionary<int, (string path, string name)> _testDictionary;
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
    foreach (var (id, path, name) in properties)
    {
        _testDictionary.TryAdd(id, (path, name));
    }
    await CreatePipeline(properties);
    //after returning I would like to check if _testDictionary contains any elements,
    //and what is their status
}

All items that come in, are registered in ConcurrentDictionary, then TPL Dataflow pipeline is called to do downloading and processing:

public async Task CreatePipeline(List<(int id, string path, string name)> properties)
{
    var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
        (data) => { return data.id; },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    var resultsBlock = new ActionBlock<int>((data) =>
    {
        _testDictionary.TryRemove(data, out _);
        //or
        //_testDictionary.AddOrUpdate(...);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    downloadBlock.LinkTo(resultsBlock,
        new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var item in properties)
    {
        await downloadBlock.SendAsync(item);
    }
    resultsBlock.Complete();
    await resultsBlock.Completion;
}

At the end in the results block item is removed from _testDictionary (or updated) according to how it went. My silly question is - if I set MaxDegreeOfParallelism = 1 for all the blocks creating my pipeline and make sure there won't be more than one pipeline running in the same time, do I really need ConcurrentDictionary for this or simple Dictionary would be sufficient? I am concerned that pipeline could be executed on a different thread and accessing simple Dictionary from there could lead to problems.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
niks
  • 579
  • 1
  • 3
  • 14
  • Are you asking if a dictionary is thread safe? No, it's not. Static methods are thread safe. See MSDN documentation please. – CodingYoshi Jul 08 '20 at 12:01
  • If you are interacting with it from a single thread, `Dictionary` is fine. If not, you need `ConcurrentDictionary`. If you aren't sure, use `ConcurrentDictionary`. – mjwills Jul 08 '20 at 12:33
  • 2
    As a side note, you probably intended to write `downloadBlock.Complete();` instead of `resultsBlock.Complete();`. – Theodor Zoulias Jul 09 '20 at 05:54
  • In case you are interested in a recursive block implementation, where each processed element can produce more elements to be processed, there is one [here](https://stackoverflow.com/questions/26130168/how-to-mark-a-tpl-dataflow-cycle-to-complete/62297218#62297218). – Theodor Zoulias Jul 09 '20 at 15:36

2 Answers2

3

Dataflow

As I can see your StartDownload tries to act like a producer and your CreatePipeline as a consumer from the _testDictionary point of view. The Add and the Remove calls are separated into two different functions that's why you needed to make that variable class level.

What if the CreatePipeline contains both calls and it returns all the unprocessed elements?

public async Task<Dictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
    var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
        properties.ToDictionary(
            prop => prop.id, 
            prop => (prop.path, prop.name)));

    // var downloadBlock = ...;

    var resultsBlock = new ActionBlock<int>(
        (data) => unprocessed.TryRemove(data, out _), options);

    //...

    downloadBlock.Complete();
    await resultsBlock.Completion;

    return unprocessed.ToDictionary(
        dict => dict.Key,
        dict => dict.Value);
}

Ordering

If ordering does not matter then you could consider to rewrite the TransformBlock population logic like this:

await Task.WhenAll(properties.Select(downloadBlock.SendAsync));

ImmutableDictionary

If you want to make sure that the returned unprocessed items can't be modified by other Threads then you could take advantage of the ImmutableDictionary.

So, if we put everything together it might look like this:

public async Task StartDownload(List<(int id, string path, string name)> properties)
{
    var unprocessedProperties = await CreatePipeline(properties);
    foreach (var property in unprocessedProperties)
    {
        //TODO
    }
}

public async Task<ImmutableDictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
    var options = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1};

    var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
        properties.ToDictionary(
            prop => prop.id, 
            prop => (prop.path, prop.name)));

    var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
        (data) => data.id, options);

    var resultsBlock = new ActionBlock<int>(
        (data) => unprocessed.TryRemove(data, out _), options);

    downloadBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
    await Task.WhenAll(properties.Select(downloadBlock.SendAsync));

    downloadBlock.Complete();
    await resultsBlock.Completion;

    return unprocessed.ToImmutableDictionary(
        dict => dict.Key, 
        dict => dict.Value); 
}

EDIT: Reflect to new new requirements

As the OP pointed out the main reason behind the dictionary is to provide the ability to extend the to be processed queue while the processing is still happening.

In other words the processing and gathering of the to-be-processed items are not a one time thing rather than a continuous activity.

The good thing is that you can get rid of the _testDictionary and resultsBlock entirely. All you need is to continuously Post or Send new data to the TransformBlock. The processing is awaited in a separate method (StopDownload).

private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;

public MyAwesomeClass()
{
    downloadBlock = new TransformBlock<(int id, string path, string name), int>(
        (data) => data.id, 
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
}

public void StartDownload(List<(int id, string path, string name)> properties)
{
   //Starts to send props, but does not await them
    _ = properties.Select(downloadBlock.SendAsync).ToList();
   
   //You can await the send operation if you wish 
}

public async Task StopDownload()
{
    downloadBlock.Complete();
    await downloadBlock.Completion;
}  

This structure can be modified easily to inject a BufferBlock to smooth the load:

private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;

public MyAwesomeBufferedClass()
{
    var transform = new TransformBlock<(int id, string path, string name), int>(
        (data) => data.id,
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1});

    var buffer = new BufferBlock<(int id, string path, string name)>(
        new DataflowBlockOptions() { BoundedCapacity = 100});

    buffer.LinkTo(transform, new DataflowLinkOptions {PropagateCompletion = true});
    downloadBlock = buffer;
}

public void StartDownload(List<(int id, string path, string name)> properties)
{
    _ = properties.Select(downloadBlock.SendAsync).ToList(); 
}

public async Task StopDownload()
{
    downloadBlock.Complete();
    await downloadBlock.Completion;
}
Peter Csala
  • 17,736
  • 16
  • 35
  • 75
  • Thank you for such a detailed answer, Peter. The reason why I wanted to make `_testDictionary` class level variable is that while pipeline is running it would be possible to add new items to `_testDictionary` and when `await CreatePipeline` would return it would check if there are any new elements added and run `CreatePipeline` again till no new elements would appear in `_testDictionary` upon returning from pipeline execution. I decided to omit this part of code in sample because it seemed irrelevant to my question. – niks Jul 09 '20 at 10:54
  • @niks That's game changer information :D. It requires a different architecture. Your `StartDownload` should just call `SendAsync`/`Post` against an `ITargetBlock`. It might be your current `TransformBlock` or a `BufferBlock` in a front of the TB to smooth the processing requests. You need the `_testDictionary` at all, just use the pipeline directly. Should I edit my answer accordingly to this? – Peter Csala Jul 09 '20 at 11:13
  • 1
    I am sorry for that. This is how StackOverflow works for me - I ask a question that I think is important and it turns out I was thinking in completely wrong direction and I end up with completely different solution:D Much better solution, I have to say. Well, it would be very nice of you if you could edit your answer to illustrate how you would implement this solution. – niks Jul 09 '20 at 11:25
  • Thank you for the update! As I understand this solution keeps the pipeline up and running. Just give it food. However I have a question - I usually put `await downloadBlock.Completion;` in a `try-catch` but in this case if one of the items that needs to be processed by the pipeline, throws an error, and I have not yet called `await downloadBlock.Completion;` that error simply won't appear in `catch` ? And if I call `await downloadBlock.Completion;` that is the end of pipeline and I can not pass any new items to it which kind of defeats the whole purpose of this solution? – niks Jul 09 '20 at 15:55
  • @niks Good question. Fortunately I have good news for you, error handling can be made quite simple in this case. Inside the `TransformBlock` whenever an exception occurs then you several possibilities. If it is a **transient failure** for example due to unreliable network then you can simply `Post` the exact same item into the queue again and try to process it later. Or you can create a dead-letter queue with a `BufferBlock`. And link an `ActionBlock` to create a log entry about them. [Other options](https://devblogs.microsoft.com/pfxteam/exception-handling-in-tpl-dataflow-networks/) – Peter Csala Jul 10 '20 at 06:48
  • Oh, I am a bit confused right now. I have been working on my `DataFlow` implementation for a while now, I have changed it multiple times and this solution means doing it one more time. I have to do some testing now. Can I ask your honest and most critical opinion about my previous approach - is using `ConcurrentDictionary` as "counter" that registers `waiting/processed/failed` items and then running pipeline in `while` loop till there are no more `waiting` items in dictionary, is considered "novice" approach/bad in any other way? – niks Jul 10 '20 at 11:42
  • @niks In my opinion the true power of Dataflow is the following: You are creating a mesh (or pipeline) and let it do its job just by feeding it. There are so many different building blocks that can help you to create a fully resilient data processing pipeline. I would consider to use a separate collection if I need to examine / post-process elements outside of the mesh. Just to feed it again, it does not seems to me a good reason. I suggest to read this (old-but-gold) [intro whitepaper](https://www.microsoft.com/en-us/download/details.aspx?id=14782) – Peter Csala Jul 10 '20 at 12:02
  • Yes, it makes sense when you put it this way. For some reason I was under impression that I should create new pipeline every now and then when new batch of information to be processed comes in. So you say it is completely fine to create one pipeline and simply let it work till whole application is terminated at the end of the day? – niks Jul 10 '20 at 13:16
  • @niks Yes. Please check [this](https://stackoverflow.com/questions/23961620/multiple-short-lived-tpl-dataflows-versus-single-long-running-flow) or [that](https://michaelscodingspot.com/c-job-queues-part-3-with-tpl-dataflow-and-failure-handling/) – Peter Csala Jul 10 '20 at 13:51
  • Thank you for your opinion and excellent links, Peter! This has changed the way I think about meshes/pipelines! I can now think of few more specific questions to ask here!:D – niks Jul 13 '20 at 14:51
  • @niks I'm glad that the provided information helped you. I would suggest to close this question and happy to hear about specific questions in other SO threads. – Peter Csala Jul 13 '20 at 15:10
2

Yes, if the structure of your code ensures that the dictionary cannot be accessed by multiple threads concurrently, then a normal Dictionary is sufficient. In case you are concerned about the visibility of the internal state of the dictionary, and the possibility of some thread seeing a stale state at some point, this is not a problem because:

TPL includes the appropriate barriers when tasks are queued and at the beginning/end of task execution, so that values are appropriately made visible.

(Source)

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Hi Theodor, thank you for the answer. Yes I understand that normal `Dictionary` is not thread safe. I was worried about pipeline itself. Let's say - if I create and call TPL pipeline from UI thread, is it guaranteed to always execute all its blocks on the same UI thread given that I set `MaxDegreeOfParallelism = 1`? – niks Jul 09 '20 at 10:15
  • @niks no, this is not guaranteed. The code inside the Dataflow blocks will run by default on `ThreadPool` threads. If you want it to run on the UI thread you must configure each block with a specific [`TaskScheduler`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.taskscheduler): the [`TaskScheduler.FromCurrentSynchronizationContext`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler.fromcurrentsynchronizationcontext). – Theodor Zoulias Jul 09 '20 at 10:33
  • Oh, this means that without any specific configuration I can not ensure that `_testDictionary` is not accessed by multiple threads. I better stick with `ConcurrentDictionary` then. Less worries. Thank you! – niks Jul 09 '20 at 10:59
  • @niks there is no problem with accessing a `Dictionary` from multiple threads, provided that the pattern of access is not concurrent, and that memory barriers are inserted at the switch points. But, yes, if using a `ConcurrentDictionary` can give you peace of mind in exchange for a tiny performance hit, I would agree that it's a good trade-off. :-) – Theodor Zoulias Jul 09 '20 at 12:25
  • 1
    Thank your for help, Theodor. I have decided to close this question and accept Peter's answer. Your post answers the question directly, but I feel that Peter's answer has more in depth ideas on why my approach using special `List` to register items is flawed at the first place. Thank you once again, up-vote from me! – niks Jul 14 '20 at 12:47