I want to create a working thread that starts on signal(task was added to shared task list), and will rest when done.
Requirements:
- Other system threads can add task any time
- the working thread should rest if it has nothing to-do
- if more tasks are added while the working thread is active, it should complete them too.
- working thread can rest for hours -> work arrive like raindrops(sometime we have a storm)
After thread adds new task to the workingList(shared list) it signals(AutoRestEvent.Set()) the working-thread to start working.
I have race condition between the Set() and the WaitOne() functions.
public static void AddWork(object obj)
{
Monitor.Enter(_syncO);
_workingList.Add(obj);
_signal.Set();
Monitor.Exit(_syncO);
}
static object _syncO = new object();
static AutoResetEvent _signal = new AutoResetEvent(false);
static List<object> _workingList = new List<object>();
static void DoWork()
{
Thread tradeThread1 = new Thread(() =>
{
Thread.CurrentThread.IsBackground = true;
string tradeMessage = string.Empty;
while (true)
{
Monitor.Enter(_syncO);
var arr = _workingList.ToArray();
Monitor.Exit(_syncO);
// race condition when the set happens just before the
// thread was locked
if (arr.Count() == 0)
_signal.WaitOne();
Monitor.Enter(_syncO);
arr = _workingList.ToArray();
Monitor.Exit(_syncO);
int count = 0;
var deleteList = new List<object>();
while (true)
{
foreach (var item in arr)
{
// the value is changing every iteration.
// this is why I need the Sleep at the end
bool b = Handle4(item);
if (b)
deleteList.Add(item);
}
if (count == 100)
break;
Thread.Sleep(100);
count++;
}
// remove done tasks from _workingList
RemoveItems(deleteList);
// we can't close, so we re-set footprints(notifications) on our price cable. -> execute on broker tick(the right tick)
// reHang trade on cable
foreach (var item in _workingList)
{
// re-use the undeleted tasks
}
}
});
tradeThread1.Start();
}
Based on the help of @John Wu I come up with the following solution: The BlockingCollection will act as gate for the working thread. each iteration will copy the new task
private static void AddWork(object tap)
{
queue.Add(tap);
}
private static BlockingCollection<object> queue = new BlockingCollection<object>();
static void Work()
{
Thread tradeThread1 = new Thread(() =>
{
while (true)
{
var workingList = new List<object>();
var deleteList = new List<object>();
var reEvaluateList = new List<object>();
while (true)
{
if (workingList.Count() == 0)
{
// thread will wait until new work arrives -> it will start working again on the first task to come in.
workingList.Add(queue.Take());
}
foreach (var item in workingList)
{
bool b = Handle4(item);
if (b)
deleteList.Add(item);
else
item.ExitCounter++;
if (item.ExitCounter == 1000)
reEvaluateList.Add(item);
}
RemoveItems(deleteList, workingList);
// we can't close, so we re-set
// we reevaluate tasks that are working for X amount of time and didn't finish
foreach (var item in reEvaluateList)
ReEvaluate(item);
RemoveItems(reEvaluateList, workingList);
// wait.. the item change-over-time, so a wait is a type of calculation.
Thread.Sleep(100);
// we want to avoid locking if we still have task to process
if (queue.Count() == 0)
continue;
// add new work to local list
workingList.Add(queue.Take());
}
}
});
tradeThread1.Start();
}
It feels a-little-bit messy. Any ideas on how to make it better?
>` so the producer can add several items at once with only one signal..
– John Wu Feb 13 '19 at 16:47