-1

I am losing my mind. Look at this code using a concurrent queue:

private readonly ConcurrentQueue<T> _queue;
private bool _isSaveActivated = false;

public void Save(T model)
{
    _queue.Enqueue(model);

    Debug.WriteLine("Enqueue " + model.ToString()); // <-- this changes everything!!!
    
    StartProcess();
}

public void Flush()
{
    Task? t = StartProcess();
    if (t != null)
        t.Wait();
}

private Task? StartProcess()
{
    Task? t = null;
    
    if (!_isSaveActivated)
    {
        lock (_lock) // ensure only one persisting loop running
        {
            if (!_isSaveActivated)
            { 
                _isSaveActivated = true;
                
                t = Task.Run(() => ExecuteProcess());
            }
        }
    }

    return t; 
}

private void ExecuteProcess()
{
    // this will run until queue is empty
    // but lets break by 10 not to delay too much
    int count = 0;
    List<T> saveList = new List<T>();

    try
    {
        while (_queue.TryDequeue(out T? item))
        {
            count++;
            saveList.Add(item);

            // if behind picked item there is empty queue, we should save as many items as there are
            if (count == 10 || !_queue.TryPeek(out T? nextItem))
            {
                Save(saveList);
                // reset values for the next 10-item loop
                count = 0;
                saveList.Clear();
            }
        }
    }
    catch (Exception ex)
    {
        // trace
    }
    finally
    {
        // unblock the next potential thread
        _isSaveActivated = false;
    }

    void Save(IEnumerable<T> persistList) // local
    {
        _dbContextProxy.AddRange(saveList);
        _dbContextProxy.Save();
    }
}

And in the unit test, a simple loop

for (int i = 1; i <= totalCount; i++)
    persister.Save(new TestModel1() { Id = i, Desc = "Item " + i });

Assert.AreEqual(633, proxy.TototalCount, "Total count should be {0}", 633);

Here is the freak show. I have the collector, every time _dbContextProxy.Save(); runs, total processed items is added to. When this line is commented out // Debug.WriteLine("Enqueue " + model.ToString()); <-- this changes everything!!! my totals is '0'. But when I have that line uncommented - it works!! Both, RUN and Debug. And I did it like 20 times already - commented, uncommented and recompiled in between. And this behavior reproducible each time.

One thing I must add that this whole code being run from Task.Run(...) somewhere down the stack.

What do you see? Help appreciated

This is example of the trace I get when Trace is added. Also works, just as Debug

Item 209
Item 210
"SAVED" 8
Item 211
Item 212
"SAVED" 6
"SAVED" 2
Item 213
StartProcess: About to execute new thread
Item 214
Item 215
Item 216
Item 217
Item 218

Item 373
Item 374
"SAVED" 10
"SAVED" 10
Item 375
Item 376
"SAVED" 10
Item 377
"SAVED" 10
Item 378
Item 379
Item 380
Item 381
Item 382
Item 383
Item 384
"SAVED" 10
"SAVED" 10
Item 385
Item 386
Item 387
"SAVED" 10
"SAVED" 10
"SAVED" 10
Item 388
Item 389
"SAVED" 10
Item 390
Item 391
"SAVED" 10
"SAVED" 10
"SAVED" 10
Item 392
Item 393
"SAVED" 10
"SAVED" 10
Item 394
Item 395
Item 396
Item 397
Item 398
Item 399
Item 400
Item 401
Item 402
"SAVED" 10
Item 403
Item 404
Item 405
"SAVED" 10
"SAVED" 10
Item 406
Item 407
Item 408
"SAVED" 10
Item 409
Item 410
Item 411
"SAVED" 6
"SAVED" 3
Item 412
StartProcess: About to execute new thread
Item 413
Item 414
Item 415
T.S.
  • 18,195
  • 11
  • 58
  • 78
  • Could you include the definition of the `_isSaveActivated` field? I am interested if it's declared as `volatile`. Including also the definitions of the other fields (like the `_queue`) might be helpful. – Theodor Zoulias Jun 29 '23 at 03:08
  • @TheodorZoulias No. I have lock . No volatile -- `private bool _isSaveActivated = false;` – T.S. Jun 29 '23 at 03:09
  • You are reading the `_isSaveActivated` outside of the `lock`. This is the infamous [double-checked locking](https://stackoverflow.com/questions/394898/double-checked-locking-in-net) pattern. I would suggest to check if your application works correctly without it. – Theodor Zoulias Jun 29 '23 at 03:13
  • @TheodorZoulias but if I remove the DblLock, I risk running more than 1 thread. I want on only 1 running at any given time – T.S. Jun 29 '23 at 03:14
  • Just remove the outer check `if (!_isSaveActivated)`, and leave the inner check (inside the `lock`). – Theodor Zoulias Jun 29 '23 at 03:16
  • I would skip the `_queue.TryPeek` and just force a `Save(saveList);` outside the loop. Right now you are only blocking the first call to `StartProcess`. Maybe you should rewrite the whole thing to use a `SemaphoreSlim` with actual `async` code, instead of sync over async. – Jeremy Lakeman Jun 29 '23 at 03:20
  • 1
    @TheodorZoulias applied your suggestion. Run 3 times with debug, and 3 times without debug. The behavior is the same – T.S. Jun 29 '23 at 03:20
  • As a side-note in the method `ExecuteProcess` couldn't you remove the local variable `int count`, and use the `saveList.Count` instead? – Theodor Zoulias Jun 29 '23 at 03:21
  • @TheodorZoulias same behavior – T.S. Jun 29 '23 at 03:25
  • @JeremyLakeman good idea to try something else. obviously. Can't do right now – T.S. Jun 29 '23 at 03:25
  • I would add an awaitable to the queue, so you can unblock each `Save()` call when the related `_dbContextProxy.Save` has returned. That way you don't return from `Save` early, or late. And you can propagate exceptions. – Jeremy Lakeman Jun 29 '23 at 03:29
  • 1
    I would suggest to consider another approach. Instead of using a naked `ConcurrentQueue` and spawning short-lived consumers (`ExecuteProcess`), use a higher-level queue with blocking/async capabilities like the [`Channel`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels.channel-1) (which is based on a `ConcurrentQueue` internally). You could look at [this question](https://stackoverflow.com/questions/70696864/consume-all-messages-in-a-system-threading-channels-channel "Consume all messages in a System.Threading.Channels.Channel") for ideas. – Theodor Zoulias Jun 29 '23 at 03:30
  • @JeremyLakeman can you be more specific , please – T.S. Jun 29 '23 at 03:30
  • @TheodorZoulias Thanks. I know about blocking collection. Obviously another approach is in the books. BTW, `TRace` calls also make this pass. But no other calls so far. – T.S. Jun 29 '23 at 03:34
  • 1
    Emptying the queue and setting `_isSaveActivated = false` is not atomic, another caller can queue an item without triggering processing. With 2 calls to `Save()`, it's possible for only one call to block. Since you don't have a timeout, you are sending the first request immediately. Instead, you might consider something like https://stackoverflow.com/questions/72313288/how-to-batch-a-channelreadert-enforcing-a-maximum-interval-policy-between-con to batch requests together. – Jeremy Lakeman Jun 29 '23 at 03:43
  • Please show the definition of `TestModel1`, I think something happened in its ToString method. – shingo Jun 29 '23 at 03:45
  • @shingo POCO. int Id, string Desc, ToString returns Desc – T.S. Jun 29 '23 at 03:47
  • What is `proxy`? How does `TototalCount` get updated? – shingo Jun 29 '23 at 03:50
  • @JeremyLakeman *" another caller can queue an item without triggering processing."* This is the exact effect I try to achieve. ... About setting variable is not atomic.. why? I set the value to true, run the thread and in the end , when I am out of the queue loop , I set it back to false, so the next caller can start the loop again. – T.S. Jun 29 '23 at 03:54
  • @shingo the proxy is just counts the number of records processed . When I have that line commented, proxy counts 0, with Debug, it counts 633, as expected – T.S. Jun 29 '23 at 03:55
  • If you haven't heard of this, its great for understanding multi-threaded apps... https://marketplace.visualstudio.com/items?itemName=mayerwin.DebugSingleThread – Jeremy Thompson Jun 29 '23 at 04:12
  • 1
    I guess that there is a race condition between the `Save` call and the testing line. `Save` enqueues items and a background task handles that items. There is no synchronization that waits until the background task has handled that items. Your debug line is time consuming and that's why it works if this line is present. Your test should also work if you put a `Task.Delay(1000)` after your `for`-loop. But that`s not the solution. You need any trigger that indicates that the inner task is finished. Already suggested `Channels` may be a solution because they contain an element counter. – Sebastian Schumann Jun 29 '23 at 04:20
  • @SebastianSchumann is right, I can confirm that the problem is that the unit test runs too fast. – shingo Jun 29 '23 at 04:32
  • @SebastianSchumann `Task.Delay(1000)` did not do anything, You see, my goal is that writing happened only on one thread at any time. Multiple threads can add to the queue. But only one consumer should run – T.S. Jun 29 '23 at 04:32
  • @shingo let me try again... – T.S. Jun 29 '23 at 04:33
  • The `Task.Delay(1000)` has to be waited or awaited, otherwise it has no effect. Just creating the `Task` is practically instantaneous. – Theodor Zoulias Jun 29 '23 at 04:36
  • @shingo This is a unit test hence I did `thread.sleep(1000)`, you're right. gosh. now I know what to look for!! Thanks – T.S. Jun 29 '23 at 04:44
  • @shingo but this is interesting. Even if I set a break point and "wait", I see 633 items, so nothing pushes them out. HA! – T.S. Jun 29 '23 at 04:48
  • A break point only breaks the executing thread, meanwhile the `ExecuteProcess` is still running. – shingo Jun 29 '23 at 04:59

1 Answers1

0

At least one problem with the example code is that you have a race condition. if _isSaveActivated is after the while loop has exited, but before it is set to false, the next ExecuteProcess will not be started.

Another potential problem is that _isSaveActivated is used from multiple threads without synchronization. This could allow various optimization and other bad things to happen, all data that can be read and written to concurrently should be protected by some form of synchronization.

A third problem is that you need some way to wait until all items have been properly saved, for example when closing your application. Or in a test, like in this specific case.

So what should you do? One alternative is to use a single thread to do all the saving, and a blocking queue to give it data. Something like this:

private BlockingCollection<int> myQueue = new(); // Uses a backing concurrentQueue by default
private Task queueTask;

public void Start() => queueTask = Task.Run(ProcessQueue);

private void ProcessQueue()
{
    foreach (var item in myQueue.GetConsumingEnumerable())
    {
        // process item
    }
}

public Task Stop()
{
    // Signal the queue that no more items will be added, and that the loop should exit
    myQueue.CompleteAdding(); 
    return queueTask; // returned task should either be awaited, or waited for
}

Note that exception handling have been removed for brewity. A potential downside with this approach is that you will use some extra resources for a thread that might spend most of its time blocked, but if you only use one such queue I would not worry about it.

Another approach is to use a LimitedConcurrencyTaskScheduler to start your tasks, with a limit of one. This moves the synchronization problem to the taskscheduler.

There are also frameworks like DataFlow that might be of use.

This is the thing with any multi threaded code. You need to be super careful to not have any unintended race conditions or other potential hazards that are unique to multithread programs. While this specific case failed early and obviously, some bugs might only appear in very specific circumstances, and according to Murphy it might only happen when doing something really important, not when you are developing or testing the application. Because of this you need to know a fair bit about multi threading before you should attempt it. You obviously know about locks, but there are more things you need to consider.

JonasH
  • 28,608
  • 2
  • 10
  • 23
  • Although I am planning to refactor this using a blocking queue I want to see if this one is solvable. Going through your comment, `but before it is set to false, the next ExecuteProcess will not be started.` - that is actually a goal here. I did not want any process start at that time. Next "save" was supposed to restart the queue and `Flush` would finish anything remaining. – T.S. Jun 29 '23 at 16:31
  • Yea, I hear ya. I've written good number of producer-consumer and other multithreaded code. In fact, this code supposed to handle the callers which run on multiple threads. I'm just not able to polish this concept of "periodic single thread start with no more than one thread running". As short-term solution, I can make this a single thread deal because this is write-out related and it is not supposed to execute constantly, and it will be done on a separate thread already. I also don't care about resources because this is a transient application. It should run for few minutes. And the goal is.. – T.S. Jun 29 '23 at 16:41
  • .. for it to run a few minutes and not an hour.... as previous version did, – T.S. Jun 29 '23 at 16:41
  • @T.S. My point is that the models (all except the first) are not guaranteed to be saved, not even after flushing, due to the race condition. I'm not even sure if you are guaranteed to not to use the dbContext concurrently due to optimizations moving things around. And these kinds of errors are considered a big deal since they can easily result i unreliable programs. – JonasH Jun 30 '23 at 06:22