3

I am implementing RTS/CTS Handshaking technique in UnetStack. I have written the code for implementing RTS/CTS between 2 nodes (for testing). But I am facing some issues in it.

This is on UnetStack IDE (version 1.3). Following are the issues I face. 1. In fshrc.groovy, I have used the ReservationReq API to send RTS. RxFrameNtf is used to get the notification for received signal, but when I am using a variable 'rx', to check if the received signal is CTS, I am getting a null value ( Line 24 of fshrc.groovy) which indicates that I am not receiving any CTS.

  1. When I send a ReservationReq to node 2 from node 1, I am not sure, which function in MySimpleHandshakeMac.groovy , processRequest or processMessage is being called, as none of its print statements are executing.

Code 1: fshrc.groovy

import org.arl.unet.*
import org.arl.unet.phy.*
import org.arl.unet.*
import org.arl.unet.phy.*
import org.arl.unet.mac.*
import org.arl.fjage.*

def mac = agentForService Services.MAC

subscribe phy

// add a closure to define the 'ping' command
send = { addr, value ->
  println "sending RTS to node $addr"
  phy << new ReservationReq(recipient: mac, to: addr, duration: 5.second)//not sure about the syntax
  println "RTS Sent"
  def msg = receive(RxFrameNtf, 1000)
  println "abc1"
  def rxNtf = receive({ it instanceof RxFrameNtf && it.from == addr}, 5000)
  println "abc2"
  println rxNtf
   def rx = pdu.decode(msg.data)
  if(rx.type == CTS_PDU && rxNtf && rxNtf.from == addr)
  {
    println "CTS Received at ${rxNtf.to} from ${rxNtf.from}}"
    phy << new DatagramReq(to: addr, protocol: Protocol.DATA, data: value)
  } 
}

Code 2: handshake-sim.groovy

//! Simulation: 3-node network with ping daemons

import org.arl.fjage.Message
import org.arl.unet.*
import org.arl.mac.*
import org.arl.unet.phy.*

import org.arl.fjage.RealTimePlatform

platform = RealTimePlatform

// run simulation forever
simulate {
  node '1', address: 1, remote: 1101, location: [0, 0, 0], shell: true, stack: { container ->
    container.add 'hand', new MySimpleHandshakeMac()
    container.shell.addInitrc "${script.parent}/fshrc.groovy"
  }
  node '2', address: 2, remote: 1102, location: [1.km, 0, 0], shell:5102, stack: { container ->
    container.add 'hand', new MySimpleHandshakeMac()
  }
}

Code 3: MySimpleHandshakeMac.groovy

import org.arl.fjage.*
import org.arl.unet.*
import org.arl.unet.phy.*
import org.arl.unet.mac.*
import org.arl.unet.nodeinfo.*

class MySimpleHandshakeMac extends UnetAgent {

  ////// protocol constants

  private final static int PROTOCOL = Protocol.MAC

  private final static float  RTS_BACKOFF     = 2.seconds
  private final static float  CTS_TIMEOUT     = 5.seconds
  private final static float  BACKOFF_RANDOM  = 5.seconds
  private final static float  MAX_PROP_DELAY  = 2.seconds
  private final static int    MAX_RETRY       = 3
  private final static int    MAX_QUEUE_LEN   = 16

  ////// reservation request queue

  private Queue<ReservationReq> queue = new ArrayDeque<ReservationReq>()

  ////// PDU encoder/decoder

  private final static int RTS_PDU = 0x01
  private final static int CTS_PDU = 0x02

  private final static PDU pdu = PDU.withFormat {
    uint8('type')         // RTS_PDU/CTS_PDU
    uint16('duration')    // ms
  }

  ////// protocol FSM

  private enum State {
    IDLE, RTS, TX, RX, BACKOFF
  }

  private enum Event {
    RX_RTS, RX_CTS, SNOOP_RTS, SNOOP_CTS
  }

  private FSMBehavior fsm = FSMBuilder.build {

    int retryCount = 0
    float backoff = 0
    def rxInfo

    state(State.IDLE) {
      action {
        if (!queue.isEmpty()) {
          after(rnd(0, BACKOFF_RANDOM)) {
            setNextState(State.RTS)
          }
        }
        block()
      }
      onEvent(Event.RX_RTS) { info ->
        rxInfo = info
        setNextState(State.RX)
      }
      onEvent(Event.SNOOP_RTS) {
        backoff = RTS_BACKOFF
        setNextState(State.BACKOFF)
      }
      onEvent(Event.SNOOP_CTS) { info ->
        backoff = info.duration + 2*MAX_PROP_DELAY
        setNextState(State.BACKOFF)
      }
    }

    state(State.RTS) {
      onEnter {
        Message msg = queue.peek()
        def bytes = pdu.encode(type: RTS_PDU, duration: Math.ceil(msg.duration*1000))
        phy << new TxFrameReq(to: msg.to, type: Physical.CONTROL, protocol: PROTOCOL, data: bytes)
        after(CTS_TIMEOUT) {
          if (++retryCount >= MAX_RETRY) {
            sendReservationStatusNtf(queue.poll(), ReservationStatus.FAILURE)
            retryCount = 0
          }
          setNextState(State.IDLE)
        }
      }
      onEvent(Event.RX_CTS) {
        setNextState(State.TX)
      }
    }

    state(State.TX) {
      onEnter {
        ReservationReq msg = queue.poll()
        retryCount = 0
        sendReservationStatusNtf(msg, ReservationStatus.START)
        after(msg.duration) {
          sendReservationStatusNtf(msg, ReservationStatus.END)
          setNextState(State.IDLE)
        }
      }
    }

    state(State.RX) {
      onEnter {
        def bytes = pdu.encode(type: CTS_PDU, duration: Math.round(rxInfo.duration*1000))
        phy << new TxFrameReq(to: rxInfo.from, type: Physical.CONTROL, protocol: PROTOCOL, data: bytes)
        after(rxInfo.duration + 2*MAX_PROP_DELAY) {
          setNextState(State.IDLE)
        }
        rxInfo = null
      }
    }

    state(State.BACKOFF) {
      onEnter {
        after(backoff) {
          setNextState(State.IDLE)
        }
      }
      onEvent(Event.SNOOP_RTS) {
        backoff = RTS_BACKOFF
        reenterState()
      }
      onEvent(Event.SNOOP_CTS) { info ->
        backoff = info.duration + 2*MAX_PROP_DELAY
        reenterState()
      }
    }

  } // of FSMBuilder

  ////// agent startup sequence

  private AgentID phy
  private int addr

  @Override
  void setup() {
    register Services.MAC
  }

  @Override
  void startup() {
    phy = agentForService Services.PHYSICAL
    subscribe(phy)
    subscribe(topic(phy, Physical.SNOOP))
    add new OneShotBehavior({
      def nodeInfo = agentForService Services.NODE_INFO
      addr = get(nodeInfo, NodeInfoParam.address)
    })
    add(fsm)
  }

  ////// process MAC service requests

  @Override
  Message processRequest(Message msg) {
    println "processRequest"
    switch (msg) {
      case ReservationReq:
        if (msg.to == Address.BROADCAST || msg.to == addr) return new Message(msg, Performative.REFUSE)
        if (msg.duration <= 0 || msg.duration > maxReservationDuration) return new Message(msg, Performative.REFUSE)
        if (queue.size() >= MAX_QUEUE_LEN) return new Message(msg, Performative.REFUSE)
        queue.add(msg)
        fsm.restart()      // tell fsm to check queue, as it may block if the queue is empty
        println "Request Accepted"
        return new ReservationRsp(msg)
      case ReservationCancelReq:
      case ReservationAcceptReq:
      case TxAckReq:
        return new Message(msg, Performative.REFUSE)
    }
    return null
  }

  ////// handle incoming MAC packets

  @Override
  void processMessage(Message msg) {
    println "processMessage"
    if (msg instanceof RxFrameNtf && msg.protocol == PROTOCOL) {
      def rx = pdu.decode(msg.data)
      def info = [from: msg.from, to: msg.to, duration: rx.duration/1000.0]
      if (rx.type == RTS_PDU) fsm.trigger(info.to == addr ? Event.RX_RTS : Event.SNOOP_RTS, info)
      else if (rx.type == CTS_PDU) fsm.trigger(info.to == addr ? Event.RX_CTS : Event.SNOOP_CTS, info)
    }
  }


  ////// expose parameters that are expected of a MAC service

  final int reservationPayloadSize = 0            // read-only parameters
  final int ackPayloadSize = 0
  final float maxReservationDuration = 65.535

  @Override
  List<Parameter> getParameterList() {            // publish list of all exposed parameters
    return allOf(MacParam)
  }

  boolean getChannelBusy() {                      // channel is considered busy if fsm is not IDLE
    return fsm.currentState.name != State.IDLE
  }

  float getRecommendedReservationDuration() {     // recommended reservation duration is one DATA packet
    return get(phy, Physical.DATA, PhysicalChannelParam.frameDuration)
  }

  ////// utility methods

  private void sendReservationStatusNtf(ReservationReq msg, ReservationStatus status) {
    send new ReservationStatusNtf(recipient: msg.sender, requestID: msg.msgID, to: msg.to, from: addr, status: status)
  }
}

I am printing RxNtf. 'null' is being printed as the value of RxNtf.

1 Answers1

4

When you send a ReservationReq, the MAC agent will send a RTS and receive a CTS. While you may be able to subscribe to PHY directly to listen to the CTS, that would not be the correct approach to use the MAC. You should wait for a MAC ReservationStatusNtf with a ReservationStatus.START to begin your transmission.

Here's some example code to show how the approach works:

def mac = agentForService Services.MAC
if (mac) {
  // send reservation request
  def req = new ReservationReq(recipient: mac, to: addr, duration: 5.second)
  def rsp = request req
  if (rsp && rsp.performative == Performative.AGREE) {
    // wait for a channel reservation notification
    def ntf = receive(ReservationStatusNtf, timeout)
    if (ntf && ntf.requestID == req.msgID && ntf.status == ReservationStatus.START) {
      // request granted, make your transmission
      phy << new DatagramReq(to: addr, protocol: Protocol.DATA, data: value)
    }
  }
}

Other than the fact that directly listening to the CTS packet would be fragile (as the MySimpleHandshakeMac may change protocol details), the reason why your approach fails is because there is a random backoff (BACKOFF_RANDOM, set to 5 seconds) in the MySimpleHandshakeMac, while your timeout in the fshrc was only 1 second. So you likely timed out before even the RTS was sent, let alone the CTS received.

Mandar Chitre
  • 2,110
  • 6
  • 14