1

In my program, a lot of Task are started. This Task can start other tasks. But when the program is closed (end of Main method), all running tasks is stopped in middle of their work.

I need when the program is closed, the closing process wait all tasks. For this, I register all started tasks and in last instruction wait all register tasks :

public static class HostedTask
{
    private readonly static ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();

    public static void Run(Action action)
    {
        var task = Task.Factory.StartNew(action, TaskCreationOptions.LongRunning);
        _tasks.Enqueue(task);
    }

    public static void Wait()
    {
        while (_tasks.Any())
        {
            if (_tasks.TryDequeue(out Task task))
            {
                task.Wait();
            }
        }
    }
}
static void Main(string[] args)
{
    Console.WriteLine("Hello World!");
    for (int i = 0; i < 100; i+= 10)
    {
        LongBackgroundWork(i);
    }
    HostedTask.Wait();
}

static void LongBackgroundWork(int id)
{
    HostedTask.Run(() =>
    {
        Console.WriteLine(id + " Begin");
        Thread.Sleep(TimeSpan.FromSeconds(10));
        Console.WriteLine(id + " End");
        for (var i = id + 1; i < id + 10; i++)
            ChildWork(i);
    });
}

static void ChildWork(int id)
{
    HostedTask.Run(() =>
    {
        Console.WriteLine(id + " Begin");
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine(id + " End");
    });
}

This strategy have some problems :

  • The collection is never cleaned, it can grow indefinitely
  • Need replace all Task declaration
  • Don't manage ContinueWith
  • Don't manage async/await

Do you have other strategy/idea?

Edit : Complexify the example to work generate child work.

vernou
  • 6,818
  • 5
  • 30
  • 58
  • 2
    Step 1: Go "Async all the way". – Fildor Aug 27 '20 at 13:36
  • You can and should mark your main method as async (https://stackoverflow.com/a/44254451/2598770) – Rand Random Aug 27 '20 at 13:37
  • 2
    What's the point of this class? what problem is it trying to solve? Is it something left from .NET 4, when `async/await` weren't available? You can wait for multiple tasks to finish with `await Task.WhenAll()` or `Task.WaitAll()` if you don't mind blocking. There are far better constructs though. Eg, `ActionBlock` can process messages posted to it with 1 or more worker tasks *without* blocking. Even better, it can be combined in a pipeline of steps using other TPL DataFlow classes. – Panagiotis Kanavos Aug 27 '20 at 13:39
  • Tasks are *not* threads, they are promises, so they typically don't need tracking. `await Task.Run(()=>...)` is just fine and doesn't need to check if the task is finished. If anything, `HostedTask` looks like an attempt to build the ThreadPool that processes tasks, using tasks as if they were threads. – Panagiotis Kanavos Aug 27 '20 at 13:41
  • 3
    If you create a `Thread` instead of a `Task` you can set its `IsBackground` to `false` to prevent the process from exiting until that thread has terminated. (Warning: This is a good way to make the user have to go to Task Manager to close a faulty application... but then, so is waiting for all tasks to complete in Main()) – Matthew Watson Aug 27 '20 at 13:43
  • @Matthew Watson, this work perfectly. Just old thread syntax. – vernou Aug 27 '20 at 14:10
  • @Fildor, this idea is great, but this need a total rework. It's a old and legacy program. – vernou Aug 27 '20 at 14:16
  • 1
    So what are you trying to do? Modernize a vintage application, just not too much? In that case, I second Matthew: Stick with Threads. It may even be cleaner than any hybrid monster creation ;D – Fildor Aug 27 '20 at 14:23
  • It's old batch that became too slow. I parallelized with TPL and gain a lot of performance. But I had this problem and this solution that work... but I dislike. – vernou Aug 27 '20 at 16:25
  • 1
    You could use an `ActionBlock` with a high DOP to do what you want with a couple of lines - the high DOP would ensure each message got its own worker task: `var block=new ActionBlock(msg=>Process(msg),new ExecutionDataflowBlockOptions{ MaxDegreeOfParallelism = 99 }.` Waiting for all of them would be two lines of code - `block.Complete(); await Completion();` – Panagiotis Kanavos Aug 27 '20 at 16:26
  • +1 for the library `System.Threading.Tasks.Dataflow`. It need a little rework, but it exactly I need. Thank. – vernou Aug 27 '20 at 16:33

2 Answers2

1

Not sure of what exactly you are trying to do but maybe something like the bellow suits you better?

It ensures that every time a task is ended it is being removed by the list (a locked one) and also that you can wait for all tasks to end.

public static class HostedTask
{
    private readonly static List<Task> _tasks = new List<Task>();
    private static Object taskLocker = new object();

    public static async Task Run(Action action)
    {
        var task = Task.Factory.StartNew(action, TaskCreationOptions.LongRunning);

        lock (taskLocker)
            _tasks.Add(task);

        await task;

        lock (taskLocker)
            _tasks.Remove(task);
    }

    public static void Wait()
    {
        IEnumerable<Task> anys;
        do
        {
            lock (taskLocker)
            {
                anys = _tasks.Where(t => !t.IsCompleted);
            }

            if ((anys != null) && (anys.Count() > 0))
                Task.WhenAll(anys).Wait();
            else return;
        } while (true);
    }
}

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Hello World!");
        for (int i = 0; i < 100; i++)
        {
            LongBackgroundWork(i);
        }
        ShortBackgroundWork(-1);

        HostedTask.Wait();
    }

    static Task LongBackgroundWork(int id)
    {
        return HostedTask.Run(() =>
        {
            Console.WriteLine(id + " Begin");
            Thread.Sleep(TimeSpan.FromSeconds(10));
            Console.WriteLine(id + " End");
        });
    }

    static Task ShortBackgroundWork(int id)
    {
        return HostedTask.Run(() =>
        {
            Console.WriteLine(id + " Begin");
            Thread.Sleep(TimeSpan.FromSeconds(1));
            Console.WriteLine(id + " End");
        });
    }
}
vernou
  • 6,818
  • 5
  • 30
  • 58
  • 1
    Doing something without understanding why is *not* a good idea. In this case, why `Task.WhenAll().Wait()` instead of `Task.WaitAll()`? And why write *any* of this code instead of eg using `ThreadPool.QueueWorkItem` ? Why use explicit locking instead of a `ConcurrentQueue` ? – Panagiotis Kanavos Aug 27 '20 at 15:49
  • The problem with ConcurrentQueue is that it is a queue - FIFO (first in first out). We need to be able to remove the task from the list at any given time (when it terminates) and as such this is the reason of a lock in the list.I will not disagree that there are hundred solutions on that. I just gave mine doing minor (not radical) changes. If you have any better solution, please state it. – Efthymios Kalyviotis Aug 27 '20 at 15:59
  • Also, Task.WaitAll() might create a problem if a new task is entered to the list while you wait (probably it will not trigger correctly). I preferred this much more extreme approach to be on the safe side. – Efthymios Kalyviotis Aug 27 '20 at 16:01
  • @EfthymiosKalyviotis, I have edited my question to trap you... but your edit manage the new case. Ok for List with explicit lock, but why `Task.WhenAll().Wait()` instead of `Task.WaitAll()`? – vernou Aug 27 '20 at 16:06
  • `Task.WaitAll() might create a problem` and so would `Task.WhenAll().Wait()`. This still blocks the thread, there's no difference. There are no triggers involved. If anything `Task.WhenAll()` creates *another* Task that that eventually gets blocked – Panagiotis Kanavos Aug 27 '20 at 16:11
  • 1
    @Vernou It is the same thing. You can use `Task.WaitAll(anys.ToArray())`. The WaitAll takes an array as a parameter but the WhenAll an enumerable (it fitted me better). – Efthymios Kalyviotis Aug 27 '20 at 16:12
  • No, that's not the difference. `Task.WhenAll()` is misused here. It's job is to *await asynchronously* for some tasks to complete, by using `await Task.WhenAll()`. – Panagiotis Kanavos Aug 27 '20 at 16:13
  • Do you have any better suggestion for solving the problem? I would like to hear it. – Efthymios Kalyviotis Aug 27 '20 at 16:15
  • 1
    @Vernou frankly, if all you wanted is to know if all threads completed, the easiest way would be to increment eg a semaphore when the thread starts and decrement it when it exits. At the end, you could check the count to ensure all threads completed. No need for queues. Or you could use `Interlocked.Increment` and `Interlocked.Decrement` on a global counter – Panagiotis Kanavos Aug 27 '20 at 16:16
  • 1
    Yes, there's no need for that kind of "task queue" at all. The OP is trying to "modernize" thread-based code by using tasks as threads. That's not going to work - the TPL *already* uses pools and work queues internally. Putting a pool and queue over a pool and queue is only going to complicate things – Panagiotis Kanavos Aug 27 '20 at 16:18
  • To give a more concrete example, we need to actually know the scenario. Parallelism, concurrency, asynchronous operations are very different from each other and are already implemented using different namespaces in .NET: PLINQ, DataFlow, `async/await` etc. So I'd pick the best library for the given scenario – Panagiotis Kanavos Aug 27 '20 at 16:22
  • If I had to process stuff as it arrives, for a long time, I'd use DataFlow, and ActionBlock, or Channels, to construct a pipeline and feed the data to it. When the time came to close the application, I'd call `headBlock.Complete(); await tail.Completion;` to wait for all pending tasks to complete. – Panagiotis Kanavos Aug 27 '20 at 16:24
0

For the fun, the Foreground Task Scheduler. This scheduler will execute all task on a new foreground thread :

public class ForegroundTaskScheduler : TaskScheduler
{
    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return Enumerable.Empty<Task>();
    }

    protected override void QueueTask(Task task)
    {
        new Thread(() => base.TryExecuteTask(task)).Start();
    }

    protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false; //No inline
    }
}

public static class ForegroudTask
{
    public static TaskScheduler Scheduler { get; }
    public static TaskFactory Factory { get; }

    static ForegroudTask()
    {
        Scheduler = new ForegroundTaskScheduler();
        Factory = new TaskFactory(Scheduler);
    }
}

The use :

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Hello World!");
        for (int i = 0; i < 100; i += 10)
        {
            LongBackgroundWork(i);
        }
    }

    static void LongBackgroundWork(int id)
    {
        ForegroudTask.Factory.StartNew(() =>
        {
            Console.WriteLine(id + " Begin");
            Thread.Sleep(TimeSpan.FromSeconds(5));
            Console.WriteLine(id + " End");
        }, TaskCreationOptions.LongRunning).ContinueWith(t =>
        {
            for (var i = id + 1; i < id + 10; i++)
                ChildWork(i);
        }, ForegroudTask.Scheduler);
    }

    static void ChildWork(int id)
    {
        ForegroudTask.Factory.StartNew(() =>
        {
            Console.WriteLine(id + " Begin");
            Thread.Sleep(TimeSpan.FromSeconds(3));
            Console.WriteLine(id + " End");
        }, TaskCreationOptions.LongRunning);
    }
}

Just this don't work with async/await.

vernou
  • 6,818
  • 5
  • 30
  • 58