1

I want to know if there's a way for a ZeroMQ socket to do only reading or only writing. Because, it seems to me that, even though there are async/multithreading examples, every thread still uses recv-then-send loop. The problem I have is, I want to have receiveMessage() that reads from ZeroMQ socket and sendMessage(msg) that writes to ZeroMQ socket. But each of those methods would run in separate thread that is constructed IN ANOTHER class. Here's my code (I'm using jeromq from Scala):

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)
  private val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  private val backendSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)

  frontendSocket.bind("tcp://*:5555")
  backendSocket.bind("inproc://backend")


  new Thread(() => {

    println("Started receiving messages")
    // Connect backend to frontend via a proxy
    ZMQ.proxy(frontendSocket, backendSocket, null)

  }).start()


  override def receiveMessage(): (String, String) = {

    val inprocReadSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocReadSocket.connect("inproc://backend")

    //  The DEALER socket gives us the address envelope and message
    val msg = ZMsg.recvMsg(inprocReadSocket)

    // Message from client's REQ socket contains 3 frames: address + empty frame + request content
    // (payload)
    val address = msg.pop
    val emptyFrame = msg.pop
    val request = msg.pop

    assert(request != null)
    msg.destroy()

    println(s"RECEIVED: $request FROM: $address")

    (address.toString, request.toString)
  }

  override def sendMessage(address: String, response: String): Unit = {

    val inprocWriteSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocWriteSocket.connect("inproc://backend")

    val addressFrame = new ZFrame(address)
    val emptyFrame = new ZFrame("")
    val responseFrame = new ZFrame(response)

    addressFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    // Sending empty frame because client expects such constructed message
    emptyFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    responseFrame.send(inprocWriteSocket, ZFrame.REUSE)

    addressFrame.destroy()
    emptyFrame.destroy()
    responseFrame.destroy()
  }

}

And here's how I would use it:

class TrafficHandler(val requestQueue: LinkedBlockingQueue[(String, Message)],
                     val responseQueue: LinkedBlockingQueue[(String, String)])
  extends Protocol {

def startHandlingTraffic(): Unit = {

    new Thread(() => {

      while (true) {

        val (address, message) = receiveMessage()

        requestQueue.put((address, message))
      }
    }).start()

    new Thread(() => {
      while (true) {

        val (address, response) = responseQueue.take()
        sendMessage(address, response)
      }
    }).start()
  }

During debugging, I've noticed I received message, correctly took it from response queue (concurrent blocking queue) with the correct destination address, but silently failed to send it. I've dived a bit in a jeromq code and it seems to me it has something to do with identity because outPipe is null. I'm guessing it's because I don't have correct recv-send loop.

EDIT AFTER @user3666197 response The code works! (although if you starting server first, it takes time to bind and connect to PUSH and PULL sockets)
Here is modified code that uses PUSH and PULL sockets:

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)

  val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  frontendSocket.bind("tcp://*:5555")

  val requestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  requestQueueSocket.bind("inproc://requestQueueSocket")

  val responseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  responseQueueSocket.bind("inproc://responseQueueSocket")

  val inprocRequestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  inprocRequestQueueSocket.connect("inproc://requestQueueSocket")

  val inprocResponseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  inprocResponseQueueSocket.connect("inproc://responseQueueSocket")

  new Thread(() => {

    println("Started receiving messages")

    while (true) {

      val msg = ZMsg.recvMsg(frontendSocket)

      // Message from client's REQ socket contains 3 frames: address + empty frame + request content
      // (payload)
      val reqAddress = msg.pop
      val emptyFrame = msg.pop
      val reqPayload = msg.pop

      assert(reqPayload != null)
      msg.destroy()

      println(s"RECEIVED: $reqPayload FROM: $reqAddress")

      requestQueueSocket.send(s"$reqAddress;$reqPayload")

      val responseMessage = new String(responseQueueSocket.recv(0))
      val respMessageSplit = responseMessage.split(";")

      val respAddress = respMessageSplit(0)
      val respPayload = respMessageSplit(1)

      val array = new BigInteger(respAddress, 16).toByteArray
      val respAddressFrame = new ZFrame(array)
      val respEmptyFrame = new ZFrame("")
      val respPayloadFrame = new ZFrame(respPayload)

      respAddressFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      // Sending empty frame because client expects such constructed message
      respEmptyFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      respPayloadFrame.send(frontendSocket, ZFrame.REUSE)

      respAddressFrame.destroy()
      respEmptyFrame.destroy()
      respPayloadFrame.destroy()

    }

  }).start()


  override def receiveMessage(): (String, String) = {

    val message = new String(inprocRequestQueueSocket.recv(0))
    val messageSplit = message.split(";")

    val address = messageSplit(0)
    val payload = messageSplit(1)

    (address, payload)
  }

  override def sendMessage(address: String, response: String): Unit = {

    inprocResponseQueueSocket.send(s"$address;$response")
  }
}

Here is the client if needed:

trait ZmqClientProtocol extends ClientProtocol {

  val context: ZMQ.Context = ZMQ.context(1)
  val socket: ZMQ.Socket = context.socket(ZMQ.REQ)

  println("Connecting to server")
  socket.connect("tcp://localhost:5555")

  override protected def send(message: String): String = {

    //  Ensure that the last byte of message is 0 because server is expecting a 0-terminated string
    val request = message.getBytes()

    // Send the message
    println(s"Sending request $request")
    socket.send(request, 0)

    //  Get the reply.
    val reply = socket.recv(0)

    new String(s"$message=${new String(reply)}")
  }
}
Grant Miller
  • 27,532
  • 16
  • 147
  • 165
Bade
  • 699
  • 3
  • 12
  • 25

1 Answers1

1

Is there a way for a ZeroMQ socket to do only reading or only writing?

Yes, several ways.

a ) use a tandem of simplex archetypes: PUSH/PULL writes and PULL/PUSH reads
b ) use a tandem of simplex archetypes: (X)PUB/(X)SUB writes and (X)SUB/(X)PUB reads


... still uses .recv()-then-.send() loop.

Well, this observation is related more to the actual socket-archetype, some of which indeed require a mandatory two-step ( hardwired inside their internal FSA-s ) sequencing of .recv()--.send()--...


... but each of those methods would run in separate thread

Well, here the challenge starts: ZeroMQ was since its initiation designed as principally zero-sharing so as to foster performance and independence. Zen-of-Zero is interesting design principle in design.

Yet, recent re-design efforts have presented in API 4.2+ a will to achieve ZeroMQ socket Access-points to become thread-safe ( which goes against the initial principle of share-nothing ), so if going to experiment in this direction, your may arrive in territories, that work, but at a cost of decline from Zen-of-Zero.

ZeroMQ Socket Access-point(s) should never be shared, even if possible, because of design purity.

Better equip such class with another pair of simplex PUSH/PULL-s, if you strive for separation of OOP-concerns, but your head-end(s) of such read-only-specialised + write-only-specialised sockets will have to handle the cases, when a "remote" ( beyond the foreign class-boundary of abstraction ) ZeroMQ Socket-archetype FSA and it's settings and performance tweaking and error-state(s) and the "remote" class will have to arrange all such plus mediate all message-transfers to/from the native ZeroMQ-socket ( which is principally isolated and hidden for both of the head-end ( specialised ) classes ).

In any case, doable with due design care.


ZeroMQ resources are not any cheaply composable / disposable trash

An idea of:

...
override def sendMessage( address:  String,
                          response: String
                          ): Unit = {

             val inprocWriteSocket: ZMQ.Socket  = context.createSocket( ZMQ.DEALER )
                 inprocWriteSocket.connect( "inproc://backend" )
                 ...

may seem easy in the source code, but ignores the actual setup overheads and shall also respect the fact, that no socket ( inproc://-transport-class being a special case ) gets RTO ( Ready-To-Operate ) in the very microsecond it was instantiated inside the Context(), the less being a fully .connect()-ed and RTO-ed after all handshaking with the remote counterparty, so best setup the SIG/MSG-infrastructure well beforehand and keep it best as a semi-persistent communication layer, rather than any ad-hoc / just-in-time initiated composable/disposable... ( Ecology of Resources )


inproc://-transport-class has one more requirement pre-API 4.x:

Connecting a socket

When connecting a socket to a peer address using zmq_connect() with the inproc:// transport, the endpoint shall be interpreted as an arbitrary string identifying the name to connect to. Before version 4.0 the name must have been previously created by assigning it to at least one socket within the same ØMQ context as the socket being connected. Since version 4.0 the order of zmq_bind() and zmq_connect() does not matter just like for the tcp:// transport type.

So in cases, when your deployment is unsure about the actual localhost API version, beware enforcing the proper order of .bind() / .connect(), otherwise teh inproc:// pipes will not work for you.

user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    Thank you for your response! So, instead of this `ZMQ.proxy()` line, I would have reading from `ROUTER`, then `PUSH`ing. In `receiveMessage()` I would `PULL` that message and save it to queue. In `sendMessage()` I would process message and `PUSH` it so that I can `PULL` it from thread where proxy is now; and send to client? – Bade Mar 17 '18 at 09:51
  • Yes, Sir. This was the proposed perspective for separation of concerns with Zen-of-zero [ avoid sharing, which is principally wrong and performance / parallelism killer :o) ] `inproc://` transports do not require any additional IO-threads' workloads, so the separation of concerns may be here an almost cost-free benefit **:o)** – user3666197 Mar 17 '18 at 12:20
  • I wrote the code as mentioned. I have the same problem again - outpipe is null. I've created and bound to `ROUTER` socket in the same thread that I'm receiving and sending from/to that socket. The message travels correctly through `PUSH` and `PULL` sockets and when finally I have to send it to client through the same `ROUTER` that I've received the message from, it silently failes (outpipe is null). Do I need to provide additional info to `ROUTER` along with address+emptyFrame+payload? – Bade Mar 17 '18 at 18:14
  • Would you mind to **post both the updated code** ( best with checkpointing debug-level log-s / print-s as principally proposed in >>> https://stackoverflow.com/a/49313503/3666197 ) **and the error-traces**?. This is fair, if you ask others to help you, to first show a reproducible MCVE altogether with all error(s)-associated details - StackOverflow does not advice anything but this focus + a reproducible, fact-based problem formulation. **Creeping focus and shifting attention without facts first never helps backtracking towards the root-cause of any problem, does it?** – user3666197 Mar 17 '18 at 20:07
  • I added the code to the question. Comments with "!!" are the ones that puzzle me. There are no errors because it fails silently. – Bade Mar 17 '18 at 21:10
  • I edited code several time as I was discovering new problem (sorry about that). I solved the problem with identity: I was converting hex to String and used that String when returning to client. The only problem now I have is that everything works only in debug mode when I "slowly" go through code or in normal runtime BUT when I first start the clients. As you can see, I initialized and connected all sockets outside of send/receive methods. How can I solve this? – Bade Mar 18 '18 at 13:38