3

I'm writing a .Net app that will need to talk to a serial port device. The device is basically a transmitter for some old school alphanumeric pagers. Occasionally, my app will need to open up a serial port and send a message to the transmitter.

I know the protocol for talking to the device. It's a bit of a back and forth "chatty" protocol. Send a command ... wait for a particular response ... send another command ... wait for another particular response ... send the actual message ... wait for an "accepted" response. I can get this to work with some really hacky code involving a series of Write(...) method calls on a SerialPort object, with Thread.Sleep calls in between.

Of course, I don't want to actually do this by relying on Thread.Sleep to wait for the device to respond. It seems like the Reactive Extensions framework should be suited for this type of thing, but I'm having trouble getting my head around it. I started with this, but quickly got lost and wasn't sure where to go next, or if this even makes sense:

var receivedData = Observable.FromEventPattern<SerialDataReceivedEventArgs>(serialPort, "DataReceived");
receivedData
    .Where(d => d.EventArgs.EventType == SerialData.Chars)
    .Subscribe(args =>
            {
                var response = serialPort.ReadExisting();
                // Now what?
            });

First, how do I kick this thing off with the first serialPort.Write() call? Then how do I chain them together by checking for the expected response before issuing the next Write() call? And of course if I don't get the expected response I'd want to break out and throw an exception or something. Am I even barking up the right tree with Rx, or is there another pattern that would be better suited for this? Thanks!

Kevin Kuebler
  • 384
  • 1
  • 13

3 Answers3

1

Rx is abstraction over "data source pushing data" scenarios. In your case you have modeled the serial port "read" method as Rx observable and this need to be combined with serial port write method. One possible solution would be something like below, although it may require some other modifications based on the specific needs of your application.

            var serialPort = new System.IO.Ports.SerialPort("COM1");
            serialPort.Open();
            var receivedData = Observable.FromEvent<SerialDataReceivedEventArgs>(serialPort, "DataReceived")
                               .Where(d => d.EventArgs.EventType == SerialData.Chars)
                               .Select(_ => serialPort.ReadExisting());
            var replay = new ReplaySubject<string>();
            receivedData.Subscribe(replay);
            var commands = (new List<string>() { "Init", "Hello", "Done" });
            commands.Select((s, i) =>
            {
                serialPort.Write(s);
                var read = replay.Skip(i).First();
                //Validate s command against read response
                return true;//To some value to indicate success or failure
            }).ToList();
Ankur
  • 33,367
  • 2
  • 46
  • 72
  • I only somewhat understand this, but I thought I'd try to implement it to see if it worked and, if so, would work through it to increase my understanding. However, there's a compile error on the "receivedData.Subscribe(replay);" line. The compiler says "cannot convert from 'System.Reactive.Subjects.ReplaySubject' to 'System.IObserver>" – Kevin Kuebler Sep 09 '11 at 01:47
  • I had a mistake on the previous line, I fixed that and the code now compiles. And it seems to work, although it only worked once. Running it more than once the app would always hang and I never received a good response from the device. Not sure what's going on there - I'm closing and disposing the SerialPort object at the end of the method. But that's probably a separate issue. This appears to be a good approach overall. Thanks! – Kevin Kuebler Sep 09 '11 at 03:00
  • I had a serial port spy app running which I believe may have caused the hangs. The code is working now, although I ended up still using a Thread.Sleep() call right after the serialPort.Write(s) call. It seems to need just a bit of time to process the command and send back the response, which makes sense. If I understand correctly, the DataReceived event may get fired multiple times before all the data is received, which is likely the issue. – Kevin Kuebler Sep 09 '11 at 03:27
  • Yes, Data Received event will fire whenever some data is avialable and that may be the partial data according to the protocol of the serial port device, so you may nee to do some sort of aggregation on the data receive observable so create the complete message – Ankur Sep 09 '11 at 04:03
  • Actually, I seem to be able to get part way there by setting the NewLine property on the serialPort and calling serialPort.ReadLine() instead of ReadExisting(). But that only partially works, because if I remove the Thread.Sleep after the write I still lose some of the responses. Any ideas on how to adjust this so that the sleep isn't needed? It just seems like s bit of hack with that there. But otherwise the code is working, so thanks again! – Kevin Kuebler Sep 09 '11 at 04:57
0

The workflow of sending and receiving can be nicely handled by combining Rx and async/await. I blogged about this at http://kerry.lothrop.de/serial-rx/.

kwl
  • 495
  • 6
  • 13
0

I don't find RX to be a great fit for this kind of serial communication. In general RX seems to be more about one way streams of data, rather than back and forth protocols. For serial communications like this, I wrote a class around the serial port that uses WaitHandles to wait for a response to commands. The general structure is:

The application calls a method to launch an async operation to send a sequence of commands. This will launch a thread (from the thread pool, I believe) that sends each command in turn. Once a command is sent, the operation waits on a WaitHandle to get the response (or timeout and do retries or fail the operation). When the response is processed, the receive WaitHandle is signaled and the next command is sent.

The serial receive event (which runs on background threads whenever data comes in) builds up packets of data. When a complete packet is received, check if a command was sent. If so, signal sending thread of the new response and wait on a different WaitHandle to let the response be processed (which can be important to prevent the receiver from trashing the response data).

EDIT: Added a (somewhat large) sample showing the two core send and receive methods.

Not shown are the Me.Receiver property, which is of type ISerialReceiver and is responsible for building packets of data, but not determining if the data is the correct response. Also not shown are CheckResponse and ProcessIncoming which are two abstract methods overriden by derived classes to determine if the response is to the command just sent and handle "unsolicited" incoming packets, respectively.

''' <summary>This field is used by <see cref="SendCommand" /> to wait for a
''' response after sending data.  It is set by <see cref="ReceiveData" />
''' when <see cref="ISerialReceiver.ProcessResponseByte">ProcessResponseByte</see>
''' on the <see cref="Receiver" /> returns true.</summary>
''' <remarks></remarks>
Private ReceiveResponse As New System.Threading.AutoResetEvent(False)
''' <summary>This field is used by <see cref="ReceiveData" /> to wait for
''' the response to be processed after setting <see cref="ReceiveResponse" />.
''' It is set by <see cref="SendCommand" /> when <see cref="CheckResponse" />
''' returns, regardless of the return value.</summary>
''' <remarks></remarks>
Private ProcessResponse As New System.Threading.ManualResetEvent(True)
''' <summary>
''' This field is used by <see cref="SendCommand" /> and <see cref="ReceiveData" />
''' to determine when an incoming packet is a response packet or if it is
''' one of a continuous stream of incoming packets.
''' </summary>
''' <remarks></remarks>
Private responseSolicited As Boolean

''' <summary>
''' Handles the DataReceived event of the wrapped SerialPort.
''' </summary>
''' <param name="sender">The wrapped SerialPort that raised the event.
''' This parameter is ignored.</param>
''' <param name="e">The event args containing data for the event</param>
''' <remarks>This function will process all bytes according to the
''' <see cref="Receiver" /> and allow <see cref="SendCommand" /> to
''' continue or will call <see cref="ProcessIncoming" /> when a complete
''' packet is received.</remarks>
Private Sub ReceiveData(ByVal sender As Object, ByVal e As SerialDataReceivedEventArgs)
    If e.EventType <> SerialData.Chars Then Exit Sub
    Dim input() As Byte

    SyncLock _portLock
        If Not _port.IsOpen OrElse _port.BytesToRead = 0 Then Exit Sub
        input = New Byte(_port.BytesToRead - 1) {}
        _port.Read(input, 0, input.Length)
    End SyncLock

    'process the received data
    If input Is Nothing OrElse input.Length = 0 OrElse Me.Receiver Is Nothing Then Exit Sub

    Dim responseCompleted As Boolean

    For i As Integer = 0 To input.Length - 1
        responseCompleted = Me.Receiver.ProcessResponseByte(input(i))

        'process completed response
        If responseCompleted Then
            responseSolicited = False
            System.Threading.WaitHandle.SignalAndWait(ReceiveResponse, ProcessResponse)

            'the data is not a response to a command sent by the decoder
            If Not responseSolicited Then
                ProcessIncoming(Me.Receiver.GetResponseData())
            End If
        End If
    Next
End Sub

''' <summary>
''' Sends a data command through the serial port.
''' </summary>
''' <param name="data">The data to be sent out the port</param>
''' <returns>The data received from the port or null if the operation
''' was cancelled.</returns>
''' <remarks>This function relies on the Receiver 
''' <see cref="ISerialReceiver.GetResponseData">GetResponseData</see> and 
''' the overriden <see cref="CheckResponse" /> to determine what response 
''' was received and if it was the correct response for the command.
''' <seealso cref="CheckResponse" /></remarks>
''' <exception cref="TimeoutException">The operation timed out.  The packet
''' was sent <see cref="MaxTries" /> times and no correct response was received.</exception>
''' <exception cref="ObjectDisposedException">The SerialTransceiver was disposed before
''' calling this method.</exception>
Private Function SendCommand(ByVal data() As Byte, ByVal ignoreCancelled As Boolean) As Byte()
    CheckDisposed()
    If data Is Nothing Then Return Nothing

    'make a copy of the data to ensure that it does not change during sending
    Dim sendData(data.Length - 1) As Byte
    Array.Copy(data, sendData, data.Length)

    Dim sendTries As Integer = 0
    Dim responseReceived As Boolean
    Dim responseData() As Byte = Nothing
    ReceiveResponse.Reset()
    ProcessResponse.Reset()
    While sendTries < MaxTries AndAlso Not responseReceived AndAlso _
          (ignoreCancelled OrElse Not Me.IsCancelled)
        'send the command data
        sendTries += 1
        If Not Me.WriteData(sendData) Then Return Nothing

        If Me.Receiver IsNot Nothing Then
            'wait for Timeout milliseconds for a response.  If no response is received
            'then waitone will return false.  If a response is received, the AutoResetEvent
            'will be triggered by the SerialDataReceived function to return true.
            If ReceiveResponse.WaitOne(Timeout, False) Then
                Try
                    'get the data that was just received
                    responseData = Me.Receiver.GetResponseData()
                    'check to see if it is the correct response
                    responseReceived = CheckResponse(sendData, responseData)
                    If responseReceived Then responseSolicited = True
                Finally
                    'allow the serial receive function to continue checking bytes
                    'regardless of if this function throws an error
                    ProcessResponse.Set()
                End Try
            End If
        Else
            'when there is no Receiver, assume that there is no response to
            'data sent from the transceiver through this method.
            responseReceived = True
        End If
    End While

    If Not ignoreCancelled AndAlso Me.IsCancelled Then
        'operation was cancelled, return nothing
        Return Nothing
    ElseIf Not responseReceived AndAlso sendTries >= MaxTries Then
        'operation timed out, throw an exception
        Throw New TimeoutException(My.Resources.SerialMaxTriesReached)
    Else
        'operation completed successfully, return the data
        Return responseData
    End If
End Function
Gideon Engelberth
  • 6,095
  • 1
  • 21
  • 22
  • Thanks for the answer. I'm pretty proficient with C# in general, but I honestly don't have much experience with async programming like this. That's why I was hoping Rx might simplify it somewhat. :-) But I think I see what you're saying re: the back-and-forth protocol. Do you have any specific code samples you could share on how this pattern is implemented? Thanks! – Kevin Kuebler Sep 09 '11 at 01:51
  • @Kevin Keubler: see edited post. This is just the send/receive functions, so you will have to add the aync operation launching on top of `SendCommand`. I used `System.ComponentModel.AsyncOperationManager` to run the method on a background thread with progress reporting. – Gideon Engelberth Sep 09 '11 at 16:23
  • Thank you for this answer. RX does not target back-and-forth but let's try a though experiment. Let's make an observable of a stream (well, a sequence of them in case of a TCP client that may have to reconnect sometimes, http://stackoverflow.com/questions/18978523/write-an-rx-retryafter-extension-method), observe with an EventLoopScheduler for separate thread, we read ill-grouped bunches of characters, we re-group using Buffer(). All in all, no thread/`WaitHandle` and from that maybe we can handle in an Observer method the core higher level of the protocol in fewer lines. Any thoughts ? – Stéphane Gourichon Apr 15 '15 at 13:38