I have this couple of methods:
private static bool loaded = false;
private static bool replaying = false;
private static string wIndex = String.Empty;
private static WorldData wData;
private static ConcurrentDictionary<int, List<long>> streamPosition
= new ConcurrentDictionary<int, List<long>>();
private static ConcurrentDictionary<int, List<string>> collectionNames
= new ConcurrentDictionary<int, List<string>>();
private static async void StartReplay()
{
try
{
Stopwatch st = new Stopwatch();
while (loaded)
{
while (replaying)
{
st.Start();
for (int i = 0; i < collectionNames.Count; i++)
{
XLogger.Log(toConsole.Debug, Thread.CurrentThread.ManagedThreadId
.ToString());
wData.CopyCollection(await DeserializeListFromStreamAsync(
wData.GetCollectionByName(collectionNames[Thread.CurrentThread
.ManagedThreadId][i]), i, new CancellationToken()));
}
st.Stop();
int sleepTime = DebriefingManager.replayRate
- (int)st.ElapsedMilliseconds;
if (sleepTime > 0)
{
Thread.Sleep(sleepTime);
}
else
{
XLogger.Log(toConsole.Bad, "Debriefing is slow, video may lag.");
XLogger.Log(toFile.System, "Debriefing is slow, video may lag.");
}
st.Reset();
}
}
}
catch (Exception e)
{
XLogger.Log(toConsole.Bad, e.ToString());
XLogger.Log(toFile.Error, e.ToString());
}
}
private static async Task<ConcurrentDictionary<string, T>>
DeserializeListFromStreamAsync<T>(
ConcurrentDictionary<string, T> coll, int i, CancellationToken cancellationToken)
{
var dataStructures = new ConcurrentDictionary<string, T>();
using (FileStream stream = File.OpenRead(DebriefingManager
.GetReadingStreamByCollection(coll)))
{
stream.Position = streamPosition[Thread.CurrentThread.ManagedThreadId][i];
using (var streamReader = new MessagePackStreamReader(stream))
{
XLogger.Log(toConsole.Debug,
$"{Thread.CurrentThread.ManagedThreadId} --- test 1");
ReadOnlySequence<byte>? msgpack = await streamReader
.ReadAsync(cancellationToken);
XLogger.Log(toConsole.Debug,
$"{Thread.CurrentThread.ManagedThreadId} --- test 2");
if (msgpack is null) return null;
dataStructures = MessagePackSerializer
.Deserialize<ConcurrentDictionary<string, T>>(
(ReadOnlySequence<byte>)msgpack, cancellationToken: cancellationToken);
}
streamPosition[Thread.CurrentThread.ManagedThreadId][i] = stream.Position;
}
return dataStructures;
}
StartReplay
is run by three different threads.
I need to have a unique id for each thread as I need the List<long>
and List<string>
to be unique for each one. So I thought about using ConcurrentDictionaries
and the Thread.CurrentThread.ManagedThreadId
as a key.
The first thing I tried was to use Thread.CurrentThread.ManagedThreadId
but I discovered that after this line: ReadOnlySequence<byte>? msgpack = await streamReader.ReadAsync(cancellationToken);
the Id changed. Not knowing that it should be immutable I thought nothing of it and tried to use the [ThreadStatic]
attribute, but after that same line the value of the variable tagged was reset to 0.
After using the Thread debug window I found out that the threads that ran my code were "killed" after that line and new ones were used to continue the code.
My question than is: why does this happen? And how do I prevent it? Might this be impacting performance?
EDIT: I should also add that the method is a modified version of the one in the MessagePack documentation in the "Multiple MessagePack structures on a single Stream " section.