4

.Net TPL experts,

Note: Cannot use DataFlow library; no add-ons allowed.

I have four tasks as shown in the diagram below:

enter image description here

  • task_1 (data_producer) -> reads records from a large file (>500000 records) and adds records to a BlockingCollection

  • task_2, task_3 (data_consumers) -> Each of these tasks take records from the BlockingCollection. Each task performs some work on the record taken from the BlockingCollection (network related) and when complete, each task can add a record to the results queue. Order of processing is NOT important.

  • task_4 (results processor) -> Takes records from results_queue and writes to an output file.

I then wait for the tasks to complete, i.e.:

Task.WhenAll( t1, t2, t3, t4 )

So I have one producer task, MULTIPLE consumer tasks, and one task for saving the results.

My question is:

HOW do I notify task 4 when tasks 2 and 3 are completed, so that task 4 also knows when to end?

I have found many examples that "move" data from ONE task to another in a linear "pipeline" fashion, but have not found any examples that illustrate the above; that is, how to notify task 4 when task 2 and 3 are complete, so that it will know when to complete as well.

My initial thought is to "register" task 2 and 3 with task 4 and simply monitor the state of each registered task -- when task 2 and 3 are no longer running, then task 4 can stop (if the results queue is also empty).

Thanks in advance.

bdcoder
  • 3,280
  • 8
  • 35
  • 55

3 Answers3

0

If you use the BlockingCollection also for the results_queue, then you can implement these notifications with usage of properties BlockingCollection.IsCompleted and BlockingCollection.IsAddingCompleted. Process is:

  • task1 calls method BlockingCollection.CompleteAdding() on the input collection, when there are no more records in input file.
  • task2 and task3 check regulary property IsCompleted on the input collection. This property is true when input collection is empty and producer called CompleteAdding() method. After this property is true, tasks 2 and 3 are finished, and they can call CompleteAdding() method on result queue and finish their work.
  • task4 can process records in the result_queue as they arrive, or can wait for result queue IsAddingCompleted property to become true and then start processing. Work of task4 is finished when IsCompleted property is true on results queue.

Edit: I am not sure if you are familiar with these IsCompleted and IsAddingCompleted properties. They are different, and are perfect for your case. I don't think that you need any other synchronization elements besides BlockingCollection properties. Please ask if additional explanation is needed!

    BlockingCollection<int> inputQueue;
    BlockingCollection<int> resultQueue;

    public void StartTasks()
    {
        inputQueue = new BlockingCollection<int>();
        resultQueue = new BlockingCollection<int>();

        Task task1 = Task.Run(() => Task1());
        Task task2 = Task.Run(() => Task2_3());
        Task task3 = Task.Run(() => Task2_3());
        Task[] tasksInTheMiddle = new Task[] { task2, task3 };
        Task waiting = Task.Run(() => Task.WhenAll(tasksInTheMiddle).ContinueWith(x => resultQueue.CompleteAdding()));
        Task task4 = Task.Run(() => Task4());

        //Waiting for tasks to finish
    }
    private void Task1()
    {
        while(true)
        {
            int? input = ReadFromInputFile();
            if (input != null)
            {
                inputQueue.Add((int)input);
            }
            else
            {
                inputQueue.CompleteAdding();
                break;
            }
        }
    }

    private void Task2_3()
    {
        while(inputQueue.IsCompleted)
        {
            int input = inputQueue.Take();
            resultQueue.Add(input);
        }
    }

    private void Task4()
    {
        while(resultQueue.IsCompleted)
        {
            int result = resultQueue.Take();
            WriteToOutputFile(result);
        }
    }
Thomas
  • 93
  • 8
  • Not clear on how the above would work. Task 2 and task 3 could still be adding records to the result queue even after the end of the input file is reached. What I really need is to know is when task 2 and task 3 have finished (ran to completion) -- so I was thinking of monitoring the status of those tasks to be sure all results are done. – bdcoder Nov 23 '16 at 18:53
  • I've read the Scott Chamberlain comment now. Th best should probably to combine mine solution with his. There is no point to wait with final task4 processing for task2 and 3 to finish if you can do that in parallel. But from his solution is better to use another task (Continue.WhenAll) to set CompleteAdding on results queue - this way you are really sure when these tasks are complete. In between task4 can call Take() method on results queue and concurrently add to to output file (while task2 and 3 are still writing). – Thomas Nov 24 '16 at 00:25
  • To accomplish the same in Scott's solution just put his method call: `Stage2MonitorStart(t2,t3);` so it that case it doesn't block the call of the Task4Start. Of course, if your implementation is that you can write to the output file even before processing tasks are finished in the middle. – Thomas Nov 24 '16 at 00:51
0

The task you're describing could fit well into a TPL Dataflow library, small add-on for a TPL itself (it can be included in project via nuget package, .NET 4.5 is supported), you just easily introduce the flow something like this (code updated based on comments with BroadcastBlock):

var buffer = new BroadcastBlock<string>();
var consumer1 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var consumer2 = new TransformBlock<string, string>(s => { /* your action here for a string */});
var resultsProcessor = new ActionBlock<string>(s => { /* your logging logic here */ });

Don't sure about your solution logic, so I thought that you simply operate the strings here. You should asynchronously send all incoming data for a first block (if you Post your data, if buffer is overloaded, message will be discarded), and link blocks between each other, like this:

buffer.LinkTo(consumer1, new DataflowLinkOptions { PropagateCompletion = true });
buffer.LinkTo(consumer2, new DataflowLinkOptions { PropagateCompletion = true });
consumer1.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });
consumer2.LinkTo(resultsProcessor, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var s in IncomingData)
{
    await buffer.SendAsync(s);
}
buffer.Complete();

If your consumers should both process all items, then you should use the BroadcastBlock (there may occur some issues about the guaranteed delivery), other option is to filter your messages by consumers (maybe by a remainder from message id by number of consumers), but in this case you should link to another one consumer which will "catch" all messages which for somewhat reason didn't been consumed.

As you can see, links between the blocks are created with complete propagation, so after this you can simply attach to the .Completion task property for a resultsProcessor:

resultsProcessor.Completion.ContinueWith(t => { /* Processing is complete */ });
Community
  • 1
  • 1
VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • Note that BufferBlock will only offer the item to the first consumer, which is not intended by the OP. In order to overcome this, you should link the BufferBlock to a TransmitBlock and link the TransmitBlock to each of the consumers. – Eyal Perry Nov 23 '16 at 17:55
  • Also note that SendAsync should be awaited. – Eyal Perry Nov 23 '16 at 17:57
  • @EyalPerry `TransmitBlock` - what exactly block do you mean? Never saw that. Do you mean a `BroadCastBlock`? – VMAtm Nov 23 '16 at 20:00
  • Yes, that.. sorry mate. long day :) – Eyal Perry Nov 23 '16 at 20:00
  • The reason why I didn't suggest the `BroadcastBlock` is that it offers current message to all consumers, which can be unexpected by OP. Maybe the blocks can be linked with some predicate, will add that to the answer. – VMAtm Nov 23 '16 at 20:03
  • 1
    If I understood OP correctly, that this is exactly what he intended. He did say that both consumers receive the items. – Eyal Perry Nov 23 '16 at 20:07
0

This is a bit of a extension on what Thomas already said.

By using a BlockingCollection you can call GetConsumingEnumerable() on it and just treat it as a normal foreach loop. This will let your tasks end "natually". The only thing you need to do is add one extra task that watches tasks 2 and 3 to see when they end and call the complete adding on them.

private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();

Task RunProcess()
{
    Task1Start();
    var t2 = Stage2Start();
    var t3 = Stage2Start();
    Stage2MonitorStart(t2,t3);
    retrun Task4Start();
}

public void Task1Start()
{
    Task.Run(()=>
    {
        foreach(var item in GetFileSource())
        {
            var processedItem = Process(item);
            _stageOneBlockingCollection.Add(processedItem);
        }
        _stageOneBlockingCollection.CompleteAdding();
    }
}

public Task Stage2Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage2(item);
            _stageTwoBlockingCollection.Add(processedItem);
        }
    }
}

void Stage2MonitorStart(params Task[] tasks)
{
    //Once all tasks complete mark the collection complete adding.
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
}

public Task Stage4Start()
{
    return Task.Run(()=>
    {
        foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
        {
            var processedItem = ProcessStage4(item);
            WriteToOutputFile(processedItem);
        }
    }
}
Community
  • 1
  • 1
Scott Chamberlain
  • 124,994
  • 33
  • 282
  • 431
  • This does that exact behavior. All tasks run concurrently, notice the `Task.Run` in all the functions. The Task that is returned from `RunProcess` is a task that represents when the entire process is complete. – Scott Chamberlain Nov 23 '16 at 23:49
  • Yup -- looks like your the winner -- thank-you so much !! – bdcoder Nov 23 '16 at 23:51