1

I have a queue of data that must be processed.

public static ConcurrentQueue<Data> queue = new ConcurrentQueue<Data>();

A bool variable that blocks thread generating if a current thread is running.

private static bool busy = false;

A timer that runs each 5000ms and runs a thread to process that queue sequentially.

timer = new System.Timers.Timer();
timer.Elapsed += Timer_Elapsed;
timer.Interval = 5000;
timer.Enabled = true;

private static void Timer_Elapsed(object? sender, System.Timers.ElapsedEventArgs e)
{
    if (!busy)
    {
        Thread thread= new Thread(Task);
        thread.Start();
    }
}

And the task function that must be run by every generated thread in background:

private static void Task()
{
    Console.WriteLine("im busy");
    busy= true;
    Data data= null;
    if (queue.Count > 0)
    {
        bool ok= queue.TryDequeue(out data);
        if (ok && data!= null)
        {
            try{
                LongTaskSync(data);
            }catch(Exception){
                Console.WriteLine("Error in longtasksync");
            }
        }
    }
   
    Console.WriteLine("im not busy");
    busy= false;
}

The problem is that busy variable sometimes get stuck to true and never becomes false. Don't know why. So queue gets full and no threads are generated to process the queue because the

if (!busy)
{
    Thread thread= new Thread(Task);
    thread.Start();
}

block is never reached.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
E. Williams
  • 405
  • 1
  • 6
  • 21
  • 1
    If you change the definition to `private static volatile bool busy = false;` does the problem go away? – Enigmativity Feb 23 '22 at 09:24
  • No, i also tried with `volatile` and the problem is still there – E. Williams Feb 23 '22 at 09:25
  • 2
    You sure that `LongTaskSync()` isn't throwing an exception? Because if it does, `busy` will be stuck at `true`. – Matthew Watson Feb 23 '22 at 09:32
  • Try adding a lock `object` and use the lock whenever you access the `busy` variable. (I would also consider changing `busy` into a private property and use the lock inside its accessors) Example: https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/statements/lock – Zserbinator Feb 23 '22 at 09:32
  • Most likely you're either stuck in a loop or crashed the thread (Exception thrown) in `LongTaskSync(data)` call, so the `busy = false` is never reached. – CoolBots Feb 23 '22 at 09:33
  • @MatthewWatson you beat me to it, lol! – CoolBots Feb 23 '22 at 09:34
  • @MatthewWatson I also thought the same, so I closed that line inside a `try-catch` block to be sure that `busy=false` is executed, but it won't work too.. – E. Williams Feb 23 '22 at 09:36
  • 1
    I think it would be useful to have an idea what `LongTaskSync` is doing, there might be some loop or anything. For example if you change `LongTaskSync(data)` with simple `Thread.Sleep(n)` does the problem go away? – E. Shcherbo Feb 23 '22 at 09:43
  • It seems that you are trying to prevent overlapping executions of the `LongTaskSync` method. You can take a look at [this](https://stackoverflow.com/questions/30462079/run-async-method-regularly-with-specified-interval) question for ideas about simpler ways to do it. – Theodor Zoulias Feb 23 '22 at 12:45
  • Thanks @TheodorZoulias, indeed thats the aim of my question, I want that function to execute atomicaly, it cannot be never executed concurrently because it involves Database operations. But I dont find how your link can help me to achieve that – E. Williams Feb 23 '22 at 21:17

3 Answers3

1

I'm not posting this as a direct answer to your question, but if you're open to an alternative approach, I can suggest you use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

IEnumerable<Data> Dequeue()
{
    while (queue.TryDequeue(out Data data))
    {
        yield return data;
    }
}

IObservable<long> query =
        Observable
            .Interval(TimeSpan.FromSeconds(5.0))
            .Select(_ => Dequeue().ToObservable())
            .Concat();

IDisposable subscription = query.Subscribe(data => LongTaskSync(data));

That handles all of the concurrency for you. There's no need to check any busy variable.

I prefer not to run code in the subscrinbe, so to run it in the query itself, and to ensure that only one LongTaskSync runs at a time, you can use an EventLoopScheduler like this:

IObservable<Unit> query =
        Observable
            .Using(
                () => new EventLoopScheduler(),
                els =>
                    Observable
                        .Interval(TimeSpan.FromSeconds(5.0))
                        .Select(_ => Dequeue().ToObservable())
                        .Concat()
                        .SelectMany(data =>
                            Observable
                                .Start(() => LongTaskSync(data), els)));


IDisposable subscription = query.Subscribe();
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
0

It seems that you want to invoke a synchronous action periodically on background threads, enforcing a non overlapping execution policy. A relatively easy way to do it is to implement an asynchronous loop, based on the Task.Delay method or the new (.NET 6) PeriodicTimer class. Here is a PeriodicTimer-based implementation:

CancellationTokenSource cts = new();

Task consumer = Task.Run(async () =>
{
    using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(5000));
    while (true)
    {
        try { await timer.WaitForNextTickAsync(cts.Token); }
        catch (OperationCanceledException) { break; }

        try
        {
            if (queue.TryDequeue(out var data))
            {
                LongTaskSync(data);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("Error in LongTaskSync");
        }
    }
});

The Task.Run method offloads the asynchronous loop on the ThreadPool. The WaitForNextTickAsync suspends the loop until the next tick of the timer. The CancellationTokenSource can be used in order to stop the loop before terminating the process, for a clean and graceful termination:

cts.Cancel();
consumer.Wait(); // or await

This way the process will not be killed in the midst of a LongTaskSync execution.

If you prefer a Task.Delay-based implementation, you can look here for the basic idea.

In case you would like to process the data in the queue as soon as they are available, and not periodically, you could look at the Channel<T> or the ActionBlock<T> components.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
-1

It was easier than I thought, just using a Mutex instead of a boolean var.

private static Mutex mut = new Mutex();

private static void Task()
{
    Console.WriteLine("im busy");
    //busy= true;
    mut.WaitOne();
    Data data= null;
    if (queue.Count > 0)
    {
        bool ok= queue.TryDequeue(out data);
        if (ok && data!= null)
        {
            try{
                LongTaskSync(data);
            }catch(Exception){
                Console.WriteLine("Error in longtasksync");
            }
        }
    }
   
    Console.WriteLine("im not busy");
    //busy= false;
    mut.ReleaseMutex();
}
E. Williams
  • 405
  • 1
  • 6
  • 21