-1

I've created a Producer/Consumer program which uses N threads of invocations ( currently only 1).

void Main()
{
    var pcQ = new PCQueue(1); // Maximum concurrency of 1

    foreach (int item in Enumerable.Range(1, 5))
    {
        pcQ.Enqueue(async () =>
      {
          await Task.Delay(1000); //<-------simulation of work
          Console.WriteLine(item);
      });
    }

}

public class PCQueue
{
    private BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
    public Task Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken))
    {
        var task = new Task(action, cancelToken);
        _taskQ.Add(task);
        return task;
    }
    public PCQueue(int workerCount)
    {

        for (int i = 0; i < workerCount; i++)
            Task.Run(Consume);
    }

    async Task Consume()
    {
        foreach (var task in _taskQ.GetConsumingEnumerable())
            try
            {
                    task.RunSynchronously();
            }
            catch (InvalidOperationException)
            {
            }
    }

}

The output is :

enter image description here

The problem is that after a second, I see all results at once.

A new item is being fetched from the blocking collection before the previous item has completed.

This is happening because of this line (without awaiting):

task.RunSynchronously();

Question: How can I await this task to complete before another one is fetched. I'm expecting to see a new line after every second.

Royi Namir
  • 144,742
  • 138
  • 468
  • 792
  • 2
    I don't think your problem is what you think it is. your `Enqueue` method takes an `Action` (an `Action` has no return type) meaning that any `async` action you pass to it will have an `async void` signature instead of having an awaitable `Task` handle. As far as I know, no matter what you'll do after that it will always have the fire and forget mechanism associated with `async void`. So to work around it instead of asking for an `Action` you might be able to ask for a `Func`, store those and then `await` them when they're up. –  Apr 11 '21 at 12:00
  • I don't understand. Why do you need multiple consumers if you do operations in sequence? – tymtam Apr 11 '21 at 12:01
  • @tymtam currently there is jsut one thread that "fetch" items. but it can be changed to n consumers. – Royi Namir Apr 11 '21 at 12:02
  • Your image doesn't even compile, but you're correct you're making the same kind of mistake as with asking for an `Action`. Change `await task.RunSynchronously();` to `await task;` (`task.RunSynchronously` returns void and therefor doesn't provide an awaitable `Task` handle) –  Apr 11 '21 at 12:07
  • You can await `task.Start(); await task;`, but that still won't ensure that tasks are started one after another. – tymtam Apr 11 '21 at 12:11
  • 1
    As a side note the `PCQueue` class looks to me like an attempt to reinvent the [`ActionBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1) from the [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) library. – Theodor Zoulias Apr 11 '21 at 13:19
  • @TheodorZoulias it's from Joe http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT – Royi Namir Apr 11 '21 at 13:20
  • 1
    I see. Joseph Albahari is one of my favorite authors, but his book is now slightly outdated. The `BlockingCollection` class is a powerful tool, but not as powerful as the TPL Dataflow blocks. You can see [here](https://stackoverflow.com/questions/21424084/task-sequencing-and-re-entracy/62882637#62882637) an example of using an `ActionBlock` as the underlying engine of a class similar to the `PCQueue`, that schedules asynchronous work. Features like cancellation and max degree of parallelism can be delegated transparently to the underlying `ActionBlock`. – Theodor Zoulias Apr 11 '21 at 13:34
  • Small hint https://devblogs.microsoft.com/pfxteam/await-synchronizationcontext-and-console-apps/ – aepot Apr 11 '21 at 14:27

1 Answers1

1

You can await the task with task.Start(); await task; but this will still not solve the problem because you'd await the outer task that wraps the actual job.

Once you change the collection to keep Task and pass Func<Task> the inner job cab be awaited:

private BlockingCollection<Task<Task>> _taskQ = new BlockingCollection<Task<Task>>();
            public Task Enqueue(Func<Task> action, CancellationToken cancelToken = default(CancellationToken))
            {
                var task = new Task<Task>(action, cancelToken);
                _taskQ.Add(task);
                return task;
            }(...)

task.Start();
var job = await task;
await job;

This would then for one cosumer execute sequentially.

Going Func<Task

We can simplify the code if we go with Func<Task>:

static void Main()
{
    var pcQ = new PCQueue(1); // Maximum concurrency of 1

    foreach (int item in Enumerable.Range(1, 5))
    {
        pcQ.Enqueue(async (CancellationToken token) =>
        {
            Console.WriteLine($"Starting {item}");
            await Task.Delay(100, token); //<-------simulation of work
            Console.WriteLine($"Ending {item}");
        });
    }
    Console.ReadLine();

}

public class PCQueue
{
    private BlockingCollection<Func<CancellationToken, Task>> _taskQ = new BlockingCollection<Func<CancellationToken, Task>>();
    public void Enqueue(Func<CancellationToken, Task> action) =>  _taskQ.Add(action);

    public PCQueue(int workerCount)
    {
        for (int i = 0; i < workerCount; i++)
            Task.Run(Consume);
    }

    async Task Consume()
    {
        var cancellationToken = ...
        foreach (var f in _taskQ.GetConsumingEnumerable())
                await f(cancellationToken);
    }

        }
    }
}
tymtam
  • 31,798
  • 8
  • 86
  • 126
  • That's why Enqueu takes `Func` not action. – tymtam Apr 11 '21 at 12:27
  • I think BlockingCollection> should take Func – Royi Namir Apr 11 '21 at 12:45
  • also - _taskQ.Add(action); is missing the cancellation token data – Royi Namir Apr 11 '21 at 12:46
  • Yes, it can. But first let's make you code work you need to change `new Task(` to `new Task(action – tymtam Apr 11 '21 at 12:46
  • now it's working. thanks. i wtill dont understand why `BlockingCollection` didnt work ? I've enqueued tasks. so why it didnt work ? – Royi Namir Apr 11 '21 at 12:48
  • The key is that you endued a task that itself was a wrapper around another task. Think outer task and inner task. – tymtam Apr 11 '21 at 12:56
  • Can the cancellation token be used in your answer ? – Royi Namir Apr 11 '21 at 13:06
  • I updated the answer with cancellation tokens. Where the token is generated and managed it app specific, but this shows how it could be passed in and used in the work method. – tymtam Apr 11 '21 at 13:25
  • The core misclarification is that `pcQ.Enqueue(async () => { await Task.Delay(1000); //<-------simulation of work Console.WriteLine(item); });` can be sent **BOTH** to `Enqueue(Action action...` and `Enqueue(Func action`.... – Royi Namir Apr 11 '21 at 13:32