0

For each new message, the previous task (if available) should be stopped and a new one started.

SOLUTION

  • The only solution I can see to my problems described below is to have CancellationTokenSource (CTS) in MessageHandler::Start and pass it to HandleAsync which will propogate it to all services. But, since _provider.StopAsync() must be called on cancelation, OnTaskStopping::_service.StopAsync() must be also called. Thus, I end up using CTS.Cancel and OnTaskStopping. Is this good approach to mix both CancellationToken and have Stop method?

PROBLEMS

  • Let' assume the MessageHandler::Start::Task.Run hasn't started yet and we have a new incoming message and MessageHandler::Start::Task.WhenAll called first. That means, the OnTaskStopping() -> _service.StopAsync() -> _cts.Cancel() is called. When eventually MessageHandler::Start::Task.Run runs, it will create a new CancellationTokenSource and thus overwrite the _cts.Cancel(). Therefore, the foreach will not be canceled.
  • If I move _cts = new CancellationTokenSource(); (denote NEW_CTS) to the end of MyService.StartAsync I might have a situation when _cts.Cancel() will be called right after NEW_CTS line. Meaning, when new MessageHandler::Start::Task.Run starts, the MyService.StartAsync will finish immediately since token is already canceled.

CODE

1    public abstract class MessageHandler
2    {
3        private readonly BlockingCollection<string> _pendingMessages;
4        public void Start()
5        {
6            _task = Task.Run(async () =>
7            {
8                Task handlerTask;
9                try
10                {
11                     // BlockingCollection is temporary. I'll switch to cahnnel to get async producer/consumer
12                     foreach (var msg in _pendingMessages.GetConsumingEnumerable(_cancellationTokenSource.Token))
13                     {
14                         try
15                         {
16                             // stop previous task
17                             if(handlerTask != null)
18                             {
19                                 await OnTaskStopping();
19.5                               await handlerTask;
20                             }
21    
22                             handlerTask = Task.Run(async () => await HandleAsync(msg));
23                         }
24                         catch (Exception ex)
25                         {
26                            ...
27                         }
28                     }
29                }
30                catch { } // OperationCanceledException
31            }
32        }
    
33        protected abstract Task HandleAsync(string msg);
34        protected abstract Task OnTaskStopping();
35    }
    
36    public class MyMessageHandler : MessageHandler
37    {
38        private readonly MyService _service;
    
39        public MyMessageHandler (MyService service)
40        {
41            _service = service;
42        }
    
43        protected override async Task HandleAsync(string msg)
44        {
45           ...
    
46            await _service.StartAsync(...);
47        }
    
48        protected override async Task OnTaskStopping()
49        {
50            await _service.StopAsync();
51        }
52    }
    
53    public class MyService
54    {
55        private CancellationTokenSource _cts;
56        private readonly IDevicesProvider _provider;
57       
58        public MyService()
59        { 
60           _cts = new CancellationTokenSource(); 
61        }
     
62        public async Task StartAsync(...)
63        {
64            _cts = new CancellationTokenSource();
    
65            foreach (var item in Items)
66            {
67                if(_cts.IsCancellationRequested)
68                   return;
                
69                ...
70            }
    
71            //_cts = new CancellationTokenSource(); 
72        }
    
73        public async Task<bool> StopAsync()
74        {
75            _cts.Cancel();
              
76            // THIS MUST HAPPEN
77            return await _provider.StopAsync();
78        }
79    } 
theateist
  • 13,879
  • 17
  • 69
  • 109
  • Take a look at the `CancelableExecution` class in [this answer](https://stackoverflow.com/questions/6960520/when-to-dispose-cancellationtokensource/61681938#61681938 "When to dispose CancellationTokenSource?"). It might be exactly what you want. – Theodor Zoulias Jan 23 '23 at 10:19
  • Or just use `BackgroundService` to manage start / stop. – Jeremy Lakeman Jan 24 '23 at 04:27
  • @TheodorZoulias, I didn't understand how to apply that in my case. Could you please give an example? – theateist Jan 24 '23 at 07:21
  • I have in mind something like this: `protected override async Task HandleAsync(string msg) { await _cancelableExecution.RunAsync(async ct => ...); }` – Theodor Zoulias Jan 24 '23 at 09:54

1 Answers1

0

You can improve this in a couple of ways.

The first is not to access _cts constantly in StartAsync. Instead, read the token from _cts once into a local / parameter.

I also find it's worth saving the Task returned from StartAsync, so your StopAsync can wait for the cancellation to complete.

Something like:

public class MyService
{
    private CancellationTokenSource? _cts;
    private Task? _task;
    private readonly IDevicesProvider _provider;
 
    public async Task StartAsync(...)
    {
        await StopAsync(); // Or throw if we're already started?

        _cts = new CancellationTokenSource();
        _task = RunAsync(_cts.Token);
        await _task; 
    }

    private async Task RunAsync(CancellationToken token)
    {
        foreach (var item in Items)
        {
            if (token.IsCancellationRequested)
               return;
            
            ...
        }
    }

    public async Task<bool> StopAsync()
    {
        bool result = false;
        if (_cts != null)
        {
            _cts.Cancel();
            try
            {
                await _task;
            }
            catch (OperationCanceledException) { }

            try
            {
                // THIS MUST HAPPEN
                result = await _provider.StopAsync();
            }
            finally
            {
                _cts = null;
                _task = null;
            }
        }

        return result;
    }
}

Once you've got that far, it might make more sense to move the await _provider.StopAsync() into the RunAsync:

public class MyService
{
    private CancellationTokenSource? _cts;
    private Task<bool>? _task;
    private readonly IDevicesProvider _provider;
 
    public async Task StartAsync(...)
    {
        await StopAsync(); // Or throw if we're already started?

        _cts = new CancellationTokenSource();
        _task = RunAsync(_cts.Token);
        await _task; 
    }

    private async Task<bool> RunAsync(CancellationToken token)
    {
        try
        {
            foreach (var item in Items)
            {
                if (token.IsCancellationRequested)
                   return;
            
                ...
            }
        }
        catch (OperationCanceledException) { }
        finally
        {
            return await _provider.StopAsync();
        }
    }

    public async Task<bool> StopAsync()
    {
        bool result = false;
        if (_cts != null)
        {
            _cts.Cancel();
            try
            {
                result = await _task;
            }
            finally
            {
                _cts = null;
                _task = null;
            }
        }

        return result;
    }
}

Of course in both cases your StartAsync only completes after the task has fully completed (or been stopped) -- you can remove the await _task to avoid this.

canton7
  • 37,633
  • 3
  • 64
  • 77
  • I think I have same issue here. Consider the following case: `Task.Run` (line 22) called with `msg1` but not started yet. At that moment a new `msg2` arrives. As a result, line 19 awaited. Meaning `OnTaskStopping() -> StopAsync` is called and completes right away since `_cts` is `null` yet. Next, it will continue to await on line 19 for the previous `handlerTask` (with msg1) to finish. Let's say that only now your `StartAsync` Task gets executed. That means it will start processing `msg1` (`RunAsync`) when it should have canceled it and start processing `msg2`. – theateist Jan 24 '23 at 04:09
  • I hope my example in the comment above is clear. I think I need some kind of synchronization and/or change how I stop previous Task in `MessageHandler.Start`. – theateist Jan 24 '23 at 04:11
  • This class isn't thread-safe. If you need to handle multiple messages arriving on different threads, you'll need some locking. If two messages arrive at the same point on different threads, do you really want to discard one of then at random, and keep the other? – canton7 Jan 24 '23 at 08:38
  • Why arndom? When new message arrives, it should cancel the previous one if it's being processed and only process the last one arrived. I'm not sure the problem is with sync in MyService. I think I need to do something in `MessageHandler` on line 19 – theateist Jan 24 '23 at 22:19
  • Random because if two messages arrive at roughly the same time, it's indeterminate as to which Task executes first – canton7 Jan 25 '23 at 06:36
  • How would you suggest to change the code to guarantee that when new message arrives the previous one will be stopped if it has started or not start if it hasn't started and just start the new one? – theateist Jan 25 '23 at 07:33
  • What should happen if a new message arrives while the previous message is being stopped? – canton7 Jan 25 '23 at 08:20
  • What happens if another new message arrives while that one is waiting? Should the waiting one be thrown away, even though it hasn't started yet? – canton7 Jan 25 '23 at 08:23
  • Let's say `Msg1` is being stopped and `Msg2` arrives. `Msg2` calls Stop and should wait for `Msg1` to stop (`Msg1` might not even have the chance to start) and then start processing `Msg2`. If `Msg3` arrives it will still be in the queue. So, once `Msg1` stopped and `Msg2` is scheduled to be processed (line 22), `Msg3` will be read (line 12) from the queue and repeat as above - stop `Msg2`, wait for it to stop and then start `Msg3` – theateist Jan 25 '23 at 08:36
  • So `Msg2` is started, and then immediately stopped? – canton7 Jan 25 '23 at 09:09
  • Yes. If `Msg2` is running/scheduled to run and `Msg3` arrives, `Msg2` should be stopped (regardless of how long it has been running, if at all). All messages are from the same group type. I have other message handler that handles messages of a different type. – theateist Jan 25 '23 at 09:16
  • Actually, taking another look at your initial comment... Change handlerTask = Task.Run(async () => await HandleAsync(msg));` to `handlerTask = HandleAsync(msg);` on line 22. There's no need to move the synchronous stuff at the start of `StartAsync` onto another thread. That way `HandleAsync` only returns once `_cts` has been assigned, `MyService` then won't be accessed by multiple threads, and doesn't need to be thread-safe – canton7 Jan 25 '23 at 10:20
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/251387/discussion-between-theateist-and-canton7). – theateist Jan 25 '23 at 14:45