0

I am trying to implement a kind of device driver in a reactive style. The idea is to have a thread fully occupied in exchanging and processing data from the device, and have a Subject to expose output data by firing OnNext events in the device's thread. I want my OnNext methods to be evaluated in the Main Thread ID 1. This is crucial for me, as, in fact, I need to collect data simultaneously from several devices. The timing of each new data sample is almost random. The Main Thread, or let's say Main Scheduler, will be occupied with this complex timeline of data of different sorts. That is why I need to be sure that OnNext is observed on the Main Thread.

Look at this toy example

using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;

class RxDeviceDriver
{
    public Subject<int> EventSubject;
    public RxDeviceDriver()
    {
        EventSubject = new Subject<int>();

        var driverThread = new Thread(() =>
        {
            Console.WriteLine("Opened device's thread {0}", Thread.CurrentThread.ManagedThreadId);
            int i = 1;
            while (true)
            {
                Console.WriteLine("Recieved new value {0}, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
                EventSubject.OnNext(i++);
                Thread.Sleep(1000);
            }
        });
        driverThread.Start();

    }
}

class ToyTest
{
    public static void Main()
    {

        Console.WriteLine("Starting on main thread {0}", Thread.CurrentThread.ManagedThreadId);

        RxDeviceDriver newDevice = new RxDeviceDriver();

        Thread.Sleep(2500);

        newDevice.EventSubject
            .ObserveOn(Scheduler.Default)
            .Subscribe(v => Console.WriteLine("Processing value {0} on thread {1}", v, Thread.CurrentThread.ManagedThreadId));

    }
}

It makes the following output:

Starting on main thread 1
Opened device's thread 10
Recieved new value 1, thread 10
Recieved new value 2, thread 10
Recieved new value 3, thread 10
Recieved new value 4, thread 10
Processing value 4 on thread 12
Recieved new value 5, thread 10
Processing value 5 on thread 12

It seems that Scheduler.Default is trying to be fast and reduce the concurrency, and this is equal to ThreadPoolScheduler in fact. I can have some control over observing thread, and I can even choose my own EventLoopScheduler, but this is not what I need, a Main Thread. A similar question was asked for WinForms application and UI thread synchronization. The working answer is to pass SynchronzationContext of the UI as an argument of the ObserveOn method. But in my case, where this layer of the model should not have any concern about the UI and has a null SynchronzationContext value, how can I force to execute OnNext on the Main Thread?

Maybe if you have any other ideas on how to implement this kind of driver in a reactive style, they are also welcome.

Improving
  • 1
  • 1
  • I think you are overanalyzing this. You can make your own "event loop" with a blockingCollection that messages can be posted to. Or use something like DataFlow. Or use a [LimitedConcurrencyTaskScheduler](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-7.0) with a limit of one. – JonasH Jul 26 '23 at 14:51

0 Answers0