0

I have a situation where I'm connected to a device via serial/rs232; Upon establishing a connection to the device, it'll start pumping sensor-specific data back to the client.

I have two goals: a) Receive continuous stream of sensor-status-updates data and hand off to the UI b) Send command and wait for reply

var messageObserver = Observable.Defer(() => serialPort.TakeWhile(
                                                 s => s != string.Empty))
                                .Repeat();

from here, I'd like to start processing the sensor-specific data: Currently I'm trying this

this.messageObserver.Where(s => Regex.Match(s, @"...").Success)
                    .Subscribe(o => OnDataArrival(o));

which works great ..until I want to execute code like this

public string GetFirmwareVersion()
{
    var firmwareObserver =
         this.messageObserver.Where(s => Regex.Match(s, @"...").Success)
                             .Take(1)
                             .PublishLast();

    var connectable = firmwareObserver.Connect();

    // send command for firmware version
    port.Send(new byte[] { 0x9 });

    // wait for a reply up to 10 seconds
    var data = firmwareObserver.Timeout(TimeSpan.FromSeconds(10)).Wait();
    connectable.Dispose();

    return data;
}

I never receive anything back and an exception is thrown (obviously from the TimeOut). If I comment out the initial subscriber (that fires off OnDataArrival), the GetFirmwareVersion code works!

So I think what I'm interested in learning are recommended avenues to accomplish two goals: a) process data coming in over the wire as its being received b) Connect & Wait while we still process data coming in

James World
  • 29,019
  • 9
  • 86
  • 120
hwnd
  • 71
  • 3

2 Answers2

0

I think you are seeing side-effects from having multiple subscribers to the source stream. If you are running both the subscriber firing OnDataArrival and the GetFirmwareVersion code together, then you will end up running two concurrent queries against the serialPort.

Whilst you are publishing within the GetFirmwareVersion code, you must publish one source from the serialPort and share it across all subscribers. So what you need to do is publish the messageObserver, and use that in both queries, and connect it when all subscribers have subscribed. You can connect before all subscribers are subscribed (and your scenario may demand this, if you want to get the firmware version at some arbitrary point after you've started reading from the port) - just be aware you may miss events.

Alternative publishing operators exist to avoid this, like Replay() for example. Here is the basic Publish-Connect usage:

 var messageObserver = Observable.Defer(
     () => serialPort.TakeWhile(s => s != string.Empty)).Repeat().Publish();

Then after subscribing call:

 messageObserver.Connect();

There are numerous ways to manage publishing and connecting depending on your scenario, but this is the basic approach. Now the subscribers will share a single subscription to the serial port.

James World
  • 29,019
  • 9
  • 86
  • 120
  • For variations on the publish theme see `Publish().RefCount()`, the `Publish(Func,IObservable>)` overload and `Replay()` to name a few - plus the raw `Multicast()` utility operator. – James World Nov 10 '14 at 17:37
0

What has worked for me:

private readonly ObservableSerialPort port; // serial port wrapper
private readonly IObservable<string> messageObserver;
private readonly Subject<string> subject;

in my classes constructor:

    this.port = new ObservableSerialPort_string("COM4");
    this.messageObserver = Observable.Defer(() => port.TakeWhile(s => s != string.Empty)).Select(s => new Regex(@"...", RegexOptions.CultureInvariant).Replace(s, string.Empty)).Repeat();

    subject = new Subject<string>();
    var subSource = messageObserver.Subscribe(subject);

    subject.Subscribe(x => Console.WriteLine("raw: " + x));

    subject.Where(s => Regex.Match(s, @"...").Success)
           .Subscribe(x => Console.WriteLine("gauge: " + x));

and now I'm able to perform code like this:

    public string GetFirmwareVersion()
    {
        var f = subject.Where(s => Regex.Match(s, @"...").Success)
                       .Take(1)
                       .PublishLast();
        var c = f.Connect();


        // send command for firmware version
        port.Send(new byte[] { 0x9 });

        var data = f.Timeout(TimeSpan.FromSeconds(10)).Wait();
        c.Dispose();

        return data;
    }

    public Config GetConfiguration()
    {
        using (var subject2 = new Subject<Config>())
        {

            var f = subject.Where(s => Regex.Match(s, @"...").Success)
                           .Select(s =>
                           {
                               // get data via xmodem
                               Thread.Sleep(500);
                               var modem = new Modem(this.port._serialPort);
                               var bytes = modem.XModemReceive(true);
                               subject2.OnNext(new DeviceConfig(bytes));
                               subject2.OnCompleted();
                               return s;
                           })
                           .Take(1)
                           .PublishLast();


            var c = f.Connect();

            // send command for firmware version
            port.Send(new byte[] { 0x2 });

            var ret = subject2.Timeout(TimeSpan.FromSeconds(60)).Wait();
            c.Dispose();

            return ret;
        }
    }

@James, you're probably bang on. Being new to Rx, the best way I could describe my previous result(s) / problem(s) would be similar to thread-blocking. Using the Subject class has made a world of difference.

hwnd
  • 71
  • 3