I have a thread that handles the message receiving every 10 seconds and have another one write these messages to the database every minute.
Each message has a different sender which is named serialNumber
in my case.
Therefore, I created a ConcurrentDictionary like below.
public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;
The key of the dictionary is serialNumber
and the value is the collection of 1-minute messages. The reason I want to collect a minute of data is instead of going database every 10 seconds is go once in every minute so I can reduce the process by 1/6 times.
public class ShotManager
{
private const int SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER = 25000;
private bool ACTIVE_FILE_DB_SHOOT_THREAD = false;
private List<Devices> _devices = new List<Devices>();
public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;
public ShotManager()
{
ACTIVE_FILE_DB_SHOOT_THREAD = Utility.GetAppSettings("AppConfig", "0", "ACTIVE_LIST_DB_SHOOT") == "1";
init();
}
private void init()
{
using (iotemplaridbContext dbContext = new iotemplaridbContext())
_devices = (from d in dbContext.Devices select d).ToList();
if (_dicAllPackets is null)
_dicAllPackets = new ConcurrentDictionary<string, ConcurrentQueue<PacketModel>>();
foreach (var device in _devices)
{
if(!_dicAllPackets.ContainsKey(device.SerialNumber))
_dicAllPackets.TryAdd(device.SerialNumber, new ConcurrentQueue<PacketModel> { });
}
}
public void Spinner()
{
while (ACTIVE_FILE_DB_SHOOT_THREAD)
{
try
{
Parallel.ForEach(_dicAllPackets, devicePacket =>
{
Thread.Sleep(100);
readAndShot(devicePacket);
});
Thread.Sleep(SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER);
//init();
}
catch (Exception ex)
{
//init();
tLogger.EXC("Spinner exception for write...", ex);
}
}
}
public void EnqueueObjectToQueue(string serialNumber, PacketModel model)
{
if (_dicAllPackets != null)
{
if (!_dicAllPackets.ContainsKey(serialNumber))
_dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
else
_dicAllPackets[serialNumber].Enqueue(model);
}
}
private void readAndShot(KeyValuePair<string, ConcurrentQueue<PacketModel>> keyValuePair)
{
StringBuilder sb = new StringBuilder();
if (keyValuePair.Value.Count() <= 0)
{
return;
}
sb.AppendLine($"INSERT INTO ......) VALUES(");
//the reason why I don't use while(TryDequeue(out ..)){..} is there's constantly enqueue to this dictionary, so the thread will be occupied with a single device for so long
for (int i = 0; i < 10; i++)
{
keyValuePair.Value.TryDequeue(out PacketModel packet);
if (packet != null)
{
/*
*** do something and fill the sb...
*/
}
else
{
Console.WriteLine("No packet found! For Device: " + keyValuePair.Key);
break;
}
}
insertIntoDB(sb.ToString()[..(sb.Length - 5)] + ";");
}
}
EnqueueObjectToQueue
caller is from a different class like below.
private void packetToDictionary(string serialNumber, string jsonPacket, string messageTimeStamp)
{
PacketModel model = new PacketModel {
MachineData = jsonPacket,
DataInsertedAt = messageTimeStamp
};
_shotManager.EnqueueObjectToQueue(serialNumber, model);
}
How I call the above function is from the handler function itself.
private void messageReceiveHandler(object sender, MessageReceviedEventArgs e){
//do something...parse from e and call the func
string jsonPacket = ""; //something parsed from e
string serialNumber = ""; //something parsed from e
string message_timestamp = DateTime.Now().ToString("yyyy-MM-dd HH:mm:ss");
ThreadPool.QueueUserWorkItem(state => packetToDictionary(serialNumber, str, message_timestamp));
}
The problem is sometimes some packets are enqueued under the wrong serialNumber
or repeat itself(duplicate entry).
Is it clever to use ConcurrentQueue
in a ConcurrentDictionary
like this?