I am trying to implement a kind of device driver in a reactive style. The idea is to have a thread fully occupied in exchanging and processing data from the device, and have a Subject
to expose output data by firing OnNext
events in the device's thread. I want my OnNext
methods to be evaluated in the Main Thread ID 1. This is crucial for me, as, in fact, I need to collect data simultaneously from several devices. The timing of each new data sample is almost random. The Main Thread, or let's say Main Scheduler
, will be occupied with this complex timeline of data of different sorts. That is why I need to be sure that OnNext
is observed on the Main Thread.
Look at this toy example
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
class RxDeviceDriver
{
public Subject<int> EventSubject;
public RxDeviceDriver()
{
EventSubject = new Subject<int>();
var driverThread = new Thread(() =>
{
Console.WriteLine("Opened device's thread {0}", Thread.CurrentThread.ManagedThreadId);
int i = 1;
while (true)
{
Console.WriteLine("Recieved new value {0}, thread {1}", i, Thread.CurrentThread.ManagedThreadId);
EventSubject.OnNext(i++);
Thread.Sleep(1000);
}
});
driverThread.Start();
}
}
class ToyTest
{
public static void Main()
{
Console.WriteLine("Starting on main thread {0}", Thread.CurrentThread.ManagedThreadId);
RxDeviceDriver newDevice = new RxDeviceDriver();
Thread.Sleep(2500);
newDevice.EventSubject
.ObserveOn(Scheduler.Default)
.Subscribe(v => Console.WriteLine("Processing value {0} on thread {1}", v, Thread.CurrentThread.ManagedThreadId));
}
}
It makes the following output:
Starting on main thread 1
Opened device's thread 10
Recieved new value 1, thread 10
Recieved new value 2, thread 10
Recieved new value 3, thread 10
Recieved new value 4, thread 10
Processing value 4 on thread 12
Recieved new value 5, thread 10
Processing value 5 on thread 12
It seems that Scheduler.Default
is trying to be fast and reduce the concurrency, and this is equal to ThreadPoolScheduler
in fact. I can have some control over observing thread, and I can even choose my own EventLoopScheduler
, but this is not what I need, a Main Thread. A similar question was asked for WinForms application and UI thread synchronization. The working answer is to pass SynchronzationContext
of the UI as an argument of the ObserveOn
method. But in my case, where this layer of the model should not have any concern about the UI and has a null SynchronzationContext
value, how can I force to execute OnNext
on the Main Thread?
Maybe if you have any other ideas on how to implement this kind of driver in a reactive style, they are also welcome.