1

Currently our application connects to an Arduino over a serial port. We send some ASCII-formatted commands, and get the same in return. To do this, we have a queue of commands, a thread dedicated to writing those commands to the port, and a thread dedicated to reading and handling all incoming replies. The class itself is responsible for dispatching the replies, which is giving it way too much responsibility (should just be responsible for port operations, not business logic).

We would rather do this in an async manner. Anything in the system can send a command with a callback function and a timeout. If the serial port gets a correct reply, it calls the callback function. Otherwise, it times out and maybe calls a second callback (or possibly a single callback with a succeeded? flag).

However, we've only ever consumed async methods (particularly in web operations), not written such a system. Can anyone give us some pointers about how to proceed?

Our current plan is to store a queue of these commands. Upon any reply, if an associated command is found (by comparing ASCII values) it is dequeued and the callback is executed. A timer will periodically check for timeouts, dequeue, and execute the appropriate callback. It seems like a straightforward solution, but the amount of code to support this is increasing substantially and we wanted to ensure there weren't any better built-in solutions or best practices for this.

Edit: To clarify further, this particular class is a singleton (for better or worse), and there are many other threads running that could access it. For example, one thread may want to request a sensor value, while another thread could be controlling a motor. These commands and their associated replies do not happen in a linear fashion; the timing may be reversed. Thus, a traditional producer-consumer model is not enough; this is more of a dispatcher.

For example, let's call this singleton class Arduino. Thread A is running, and wants to send a command "*03", so it calls Arduino.Instance.SendCommand("*03"). Meanwhile, Thread B sends a command "*88", both of which get sent in near-realtime. Sometime later the Arduino's SerialPort.Read() thread picks up a reply for *88 and then a reply for *03 (i.e. in the opposite order they were sent). How do we allow both Thread A and Thread B to block correctly waiting on the specific reply to come in? We're assuming we will use AutoResetEvent inside each thread, with an async callback to let us .Set it.

drharris
  • 11,194
  • 5
  • 43
  • 56

2 Answers2

3

If performance is what you're after, and async at its finest level, I suggest looking into Completion Ports. This is what is ultimately underneath, hidden in the Windows Kernel, and it's awesome. When I used them, I used C++, even found a Kernel bug because of it, but I was limited to the language.

I've seen this article on CodeProject which might be worth exploring to see where you can take your idea further and/or use the code that's there.

The nature of Completion ports is to work on callbacks. That is, in general, you "put" a request in the queue, and when something lands there, the request is read and the callback specified is read. It is in fact, a queue, but like I said, at the lowest (manageable) level (before getting almost on metal).

EDIT: I've written a sort of a FTP server/client testing utility with Completion ports, so the base process is the same - reading and writing of commands in a queuable fashion. Hope it helps.

EDIT #2: Ok, here's what I would do, based on your feedback and comments. I would have an "outgoing queue", ConcurrentQueue<Message>. You can have a separate thread for sending messages by dequeueing each message. Note, if you want it a bit more "safe", I suggest peeking at the message, sending it, then dequeuing it. Anyway, the Message class can be internal, and look something like this:

private class Message {
    public string Command { get; set; }
    ... additonal properties, like timeouts, etc. ...
}

In the singleton class (I'll call it CommunicationService), I'd also have a ConcurrentBag<Action<Response>>. This is now where the fun starts :o). When a separate concern wants to do something, it registeres itself, for example, if you have a TemepratureMeter I would have it do something like this:

public class TemperatureMeter {
   private AutoResetEvent _signal = new AutoResetEvent(false);

   public TemperatureMeter {
     CommunicationService.AddHandler(HandlePotentialTemperatureResponse);
   }

   public bool HandlePotentialTemperatureResponse(Response response) {
     // if response is what I'm looking for
     _signal.Set();

     // store the result in a queue or something =)
   }

   public decimal ReadTemperature() {
     CommunicationService.SendCommand(Commands.ReadTemperature);
     _signal.WaitOne(Commands.ReadTemperature.TimeOut); // or smth like this

     return /* dequeued value from the handle potential temperature response */;
   }

}

And now, in your CommunicationService, when you receive a response, you simply to a

foreach(var action in this._callbacks) {
   action(rcvResponse);
}

Voila, separation of concerns. Does it answer your question any better?

Another possible tactic would be, to couple message and callback, but having the Callback be a Func<Response, bool> and the dispatcher thread checks if the result returned from the Func is true, then this callback is disposed.

Anže Vodovnik
  • 2,325
  • 16
  • 25
  • This is quite interesting, and I'll give it a look. In this particular scenario, we're less concerned with performance and more about management of concerns. The `Arduino` class should not care what is done with the data (as it does now), it should leave that responsibility with whichever classes were responsible for sending the originating commands. Considering only <100 of these commands are sent per minute, performance is not critical, but this will be interesting reading for sure. – drharris Aug 02 '11 at 18:59
  • Can your responses be sent out of order? Or is it always so that your underlying system will process them in a queued fashion? – Anže Vodovnik Aug 02 '11 at 19:12
  • They can be sent at any time, in any order. Various components in the system will make requests at near-random times, and the microcontroller will process them somewhat asynchronously. For example, one component may request the temperature, which is a fast operation, while another component is moving a stepper motor, which takes longer. The sensor request and reply may happen all during the motor operation. The requests are processed linearly, but the replies I get come in no particular order. – drharris Aug 02 '11 at 19:20
  • But you can deduce which reply is for which message? If so, then I suggest moving **whole concern** to a pipeline based system. So use a queue for dispatching, and a pipeline for reach reply. This means that each member gets notified of a response. I'll edit #2 my answer above to reflect this. – Anže Vodovnik Aug 02 '11 at 20:35
  • This is actually right along the line of what we were thinking, but it helps us abstract it even further. I think separating this portion out into the `TemperatureMeter` class is the missing piece. We were trying to do this directly from another thread, which is why the code was growing. We'll probably take it one step further and encapsulate that kind of logic within an `abstract class` and allow each base class to be created from configuration by a factory. Thanks for helping us clear it up! – drharris Aug 02 '11 at 21:39
1

A better choice if you are using 4.0 is to use a BlockingCollection for that, for older versions use a combination from a Queue<T> and AutoResetEvent. So you will be notified when item is added and on the consumer thread and then just consume it. here we are using a push technology where at your current implementation you are using a poll technology "each time you are asking if there is any data".

Example: 4.0

//declare the buffer
private BlockingCollection<Data> _buffer = new BlockingCollection<Data>(new ConcurrentQueue<Data>());

//at the producer method "whenever you received an item":
_messageBuffer.Add(new Data());

//at the consumer thread "another thread(s) that is running without to consume the data when it arrived."
foreach (Data data in _buffer.GetConsumingEnumerable())// or "_buffer.Take" it will block here automatically waiting from new items to be added
{
    //handle the data here.
}

Example: other "lower" versions:

private ConcurrentQueue<Data> _queue = new ConcurrentQueue<Data>();
private AutoResetEvent _queueNotifier = new AutoResetEvent(false);

//at the producer:
_queue.Enqueue(new Data());
_queueNotifier.Set();

//at the consumer:
while (true)//or some condition
{
    _queueNotifier.WaitOne();//here we will block until receive signal notification.
    Data data;
    if (_queue.TryDequeue(out data))
    {
        //handle the data
    }
}
Jalal Said
  • 15,906
  • 7
  • 45
  • 68
  • We basically do this already to process the incoming data from the serial port. We encapsulate each reply and put it in a `BlockingCollection`. Then, another thread simply waits for a new reply to come in and then processes it once it gets it. The question here is how to then process it in a way that will allow a second consumer (specific to the reply I get) to wait for that reply. – drharris Aug 02 '11 at 18:50