I am losing my mind. Look at this code using a concurrent queue:
private readonly ConcurrentQueue<T> _queue;
private bool _isSaveActivated = false;
public void Save(T model)
{
_queue.Enqueue(model);
Debug.WriteLine("Enqueue " + model.ToString()); // <-- this changes everything!!!
StartProcess();
}
public void Flush()
{
Task? t = StartProcess();
if (t != null)
t.Wait();
}
private Task? StartProcess()
{
Task? t = null;
if (!_isSaveActivated)
{
lock (_lock) // ensure only one persisting loop running
{
if (!_isSaveActivated)
{
_isSaveActivated = true;
t = Task.Run(() => ExecuteProcess());
}
}
}
return t;
}
private void ExecuteProcess()
{
// this will run until queue is empty
// but lets break by 10 not to delay too much
int count = 0;
List<T> saveList = new List<T>();
try
{
while (_queue.TryDequeue(out T? item))
{
count++;
saveList.Add(item);
// if behind picked item there is empty queue, we should save as many items as there are
if (count == 10 || !_queue.TryPeek(out T? nextItem))
{
Save(saveList);
// reset values for the next 10-item loop
count = 0;
saveList.Clear();
}
}
}
catch (Exception ex)
{
// trace
}
finally
{
// unblock the next potential thread
_isSaveActivated = false;
}
void Save(IEnumerable<T> persistList) // local
{
_dbContextProxy.AddRange(saveList);
_dbContextProxy.Save();
}
}
And in the unit test, a simple loop
for (int i = 1; i <= totalCount; i++)
persister.Save(new TestModel1() { Id = i, Desc = "Item " + i });
Assert.AreEqual(633, proxy.TototalCount, "Total count should be {0}", 633);
Here is the freak show. I have the collector, every time _dbContextProxy.Save();
runs, total processed items is added to. When this line is commented out // Debug.WriteLine("Enqueue " + model.ToString()); <-- this changes everything!!!
my totals is '0'. But when I have that line uncommented - it works!! Both, RUN and Debug. And I did it like 20 times already - commented, uncommented and recompiled in between. And this behavior reproducible each time.
One thing I must add that this whole code being run from Task.Run(...)
somewhere down the stack.
What do you see? Help appreciated
This is example of the trace I get when Trace
is added. Also works, just as Debug
Item 209
Item 210
"SAVED" 8
Item 211
Item 212
"SAVED" 6
"SAVED" 2
Item 213
StartProcess: About to execute new thread
Item 214
Item 215
Item 216
Item 217
Item 218
Item 373
Item 374
"SAVED" 10
"SAVED" 10
Item 375
Item 376
"SAVED" 10
Item 377
"SAVED" 10
Item 378
Item 379
Item 380
Item 381
Item 382
Item 383
Item 384
"SAVED" 10
"SAVED" 10
Item 385
Item 386
Item 387
"SAVED" 10
"SAVED" 10
"SAVED" 10
Item 388
Item 389
"SAVED" 10
Item 390
Item 391
"SAVED" 10
"SAVED" 10
"SAVED" 10
Item 392
Item 393
"SAVED" 10
"SAVED" 10
Item 394
Item 395
Item 396
Item 397
Item 398
Item 399
Item 400
Item 401
Item 402
"SAVED" 10
Item 403
Item 404
Item 405
"SAVED" 10
"SAVED" 10
Item 406
Item 407
Item 408
"SAVED" 10
Item 409
Item 410
Item 411
"SAVED" 6
"SAVED" 3
Item 412
StartProcess: About to execute new thread
Item 413
Item 414
Item 415