0

I have this couple of methods:

private static bool loaded = false;
private static bool replaying = false;
private static string wIndex = String.Empty;
private static WorldData wData;
private static ConcurrentDictionary<int, List<long>> streamPosition
    = new ConcurrentDictionary<int, List<long>>();
private static ConcurrentDictionary<int, List<string>> collectionNames
    = new ConcurrentDictionary<int, List<string>>();
private static async void StartReplay()
{
    try
    {
        Stopwatch st = new Stopwatch();
        while (loaded)
        {
            while (replaying)
            {
                st.Start();
                for (int i = 0; i < collectionNames.Count; i++)
                {
                    XLogger.Log(toConsole.Debug, Thread.CurrentThread.ManagedThreadId
                        .ToString());
                    wData.CopyCollection(await DeserializeListFromStreamAsync(
                        wData.GetCollectionByName(collectionNames[Thread.CurrentThread
                        .ManagedThreadId][i]), i, new CancellationToken()));
                }
                st.Stop();
                int sleepTime = DebriefingManager.replayRate
                    - (int)st.ElapsedMilliseconds;
                if (sleepTime > 0)
                {
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    XLogger.Log(toConsole.Bad, "Debriefing is slow, video may lag.");
                    XLogger.Log(toFile.System, "Debriefing is slow, video may lag.");
                }
                st.Reset();
            }
        }
    }
    catch (Exception e)
    {
        XLogger.Log(toConsole.Bad, e.ToString());
        XLogger.Log(toFile.Error, e.ToString());
    }
}

private static async Task<ConcurrentDictionary<string, T>>
    DeserializeListFromStreamAsync<T>(
    ConcurrentDictionary<string, T> coll, int i, CancellationToken cancellationToken)
{
    var dataStructures = new ConcurrentDictionary<string, T>();
    using (FileStream stream = File.OpenRead(DebriefingManager
        .GetReadingStreamByCollection(coll)))
    {
        stream.Position = streamPosition[Thread.CurrentThread.ManagedThreadId][i];
        using (var streamReader = new MessagePackStreamReader(stream))
        {
            XLogger.Log(toConsole.Debug,
                $"{Thread.CurrentThread.ManagedThreadId} --- test 1");
            ReadOnlySequence<byte>? msgpack = await streamReader
                .ReadAsync(cancellationToken);
            XLogger.Log(toConsole.Debug,
                $"{Thread.CurrentThread.ManagedThreadId} --- test 2");
            if (msgpack is null) return null;
            dataStructures = MessagePackSerializer
                .Deserialize<ConcurrentDictionary<string, T>>(
                (ReadOnlySequence<byte>)msgpack, cancellationToken: cancellationToken);
        }
        streamPosition[Thread.CurrentThread.ManagedThreadId][i] = stream.Position;
    }

    return dataStructures;
}

StartReplay is run by three different threads. I need to have a unique id for each thread as I need the List<long> and List<string> to be unique for each one. So I thought about using ConcurrentDictionaries and the Thread.CurrentThread.ManagedThreadId as a key.

The first thing I tried was to use Thread.CurrentThread.ManagedThreadId but I discovered that after this line: ReadOnlySequence<byte>? msgpack = await streamReader.ReadAsync(cancellationToken); the Id changed. Not knowing that it should be immutable I thought nothing of it and tried to use the [ThreadStatic] attribute, but after that same line the value of the variable tagged was reset to 0. After using the Thread debug window I found out that the threads that ran my code were "killed" after that line and new ones were used to continue the code.

My question than is: why does this happen? And how do I prevent it? Might this be impacting performance?

EDIT: I should also add that the method is a modified version of the one in the MessagePack documentation in the "Multiple MessagePack structures on a single Stream " section.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
MrBott_a
  • 21
  • 5
  • Are you running this code in asp.net? If yes, there is no synchronizationcontext. you may read this: https://blog.stephencleary.com/2017/03/aspnetcore-synchronization-context.html – Rand Random Jan 24 '23 at 08:28
  • @RandRandom Nope, it's a console application in .NET Core 3.1. – MrBott_a Jan 24 '23 at 08:33
  • Than this could be of interest to you: https://devblogs.microsoft.com/pfxteam/await-synchronizationcontext-and-console-apps/ – Rand Random Jan 24 '23 at 08:36
  • 3
    Why do you want to prevent this? Its the nature of a Task. A Task is not a thread and continuations in Tasks often mean changing the thread. That is normal and not a problem. – Ralf Jan 24 '23 at 09:18
  • @Ralf I have these three thread that need to have different informations each, it's done to have three tasks awaiting at the same time and deserializing as I need to the deserialize 11 different collections each 100ms – MrBott_a Jan 24 '23 at 09:41
  • 1
    That doesn't answer why you need to maintain which thread they run on. They will run on threadpool threads (of which there are many), and there shouldn't be any reason why that's an issue. It usually improves performance as it means a task does not have to wait for the original thread to become available. – Charlieface Jan 24 '23 at 10:00
  • @Charlieface If you have a suggestion on how I can access different informations from each thread running the same method other than `Thread.CurrentThread.ManagedThreadId` I am open to ideas...as a junior developer working for the first time with multithreading this is the best way I found to do so... – MrBott_a Jan 24 '23 at 10:06
  • 1
    Just assign each async flow a different ID (a sequential int or maybe a GUID) and use that to access the dictionary. There is also `AsyncStatic` if that's relevant. Hard to say what you need without further understanding on what the code is actually doing and why you need to store the task state in a global dictionary. – Charlieface Jan 24 '23 at 10:11
  • 2
    `If you have a suggestion on how I can access different informations from each thread running the same method` You could pass a parameter to the task, and pass a different object to each invocation. – Matthew Watson Jan 24 '23 at 10:11
  • I realized that I explained myself wrong. The thread that needs to have the same id is the caller of the method I posted. – MrBott_a Jan 24 '23 at 10:23

1 Answers1

2

Why does this happen?

Because this is the nature of the beast (asynchrony). The completion of asynchronous operations happens on a thread that is usually different than the thread that initiated the asynchronous operation. This is especially true for Console applications, that are not equipped with any specialized mechanism that restores the original thread after the await. It would be different if you had, for example, a Windows Forms app. These applications start by installing a specialized scheduler on the UI thread, called WindowsFormsSynchronizationContext, which intervenes after the await, and schedules the continuation back on the UI thread. You don't have such a thing in a Console application, so you are experiencing the effects of asynchrony in its purest form.

How do I prevent it?

By not having asynchronous execution flows. Just wait synchronously all the asynchronous operations, and you'll be in the same thread from start to finish:

ReadOnlySequence<byte>? msgpack = streamReader
    .ReadAsync(cancellationToken).GetAwaiter().GetResult();

If you find it tiresome to write .GetAwaiter().GetResult() everywhere, you can shorten it to .Wait2() with these two extension methods:

public static void Wait2(this Task task) => task.GetAwaiter().GetResult();
public static TResult Wait2<TResult>(this Task<TResult> task) => task.GetAwaiter().GetResult();

Might this be impacting performance?

It might impact the memory efficiency. Your threads will be blocked during the asynchronous operations, so you program might need more threads than usual. This could have some temporary effects on performance, in case the ThreadPool becomes saturated, and needs to spawn more threads. The thread-injecting heuristics are a bit conservative, by injecting at maximum one new thread per second. In case this is a problem, you can configure the ThreadPool in advance with the ThreadPool.SetMinThreads method.

Note: Blocking the current thread with .GetAwaiter().GetResult() is not a good way to write code in general. It violates the common wisdom of not blocking on async code. Here I am just answering directly your question about how to prevent the thread from changing. I am not advising you to actually do it. If you asked for my advice, I would say to rethink everything that you have done so far, and maybe restart your project from scratch.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104