2

I'm trying to use gob with a zmq4 socket (from pebbe's zmq4). A zmq4 socket does not have an io device making it seemingly impossible for gob to read/write directly:

I cannot use &client ( type **zmq4.Socket ) as type io.Writer in an argument to gob.NewEncoder: zmq4.Socket does not implement an io.Writer (missing Write method)

One zmq4 send function, the SendMessage(), accepts an interface{}, so I'm using that to send.

On the server side zmq4 receive functions are returning either a string, []byte, []string or [][]byte. I'm using the RecvMessage() which returns a []string.

It'd be OK to write to a bytes.Buffer, send that buffer, read it as a []string then take the content portion of the message to process with gob. Although the current problem lies in converting that []string to a bytes.Buffer for gob to be able to io.Read from it. Sounds basic, but I've tried many approaches so far w/o success. Here's the current one. The problem is apparently with gob yielding "extra data in buffer" when data seems to be the same before sending and after receiving, as the print statements are showing.

Is there a simpler, more go-like of going about this? If you have zmq4, the code below is self-contained and should execute.

package main

import (
    "bytes"
    "encoding/gob"
    "fmt"
    "sync"
    "time"

    zmq "github.com/pebbe/zmq4"
)

type LogEntry struct {
    ErrID  int
    Name   string
    Level  string
    LogStr string
}

// The client task
// --------------------------------------------------------------
func client_task(s string) {
    var mu sync.Mutex
    client, _ := zmq.NewSocket(zmq.DEALER)
    defer client.Close()
    client.SetIdentity(s)
    client.Connect("tcp://localhost:5570")

    go func() {

        aLogEntry := &LogEntry{
            ErrID:  1,
            Name:   "Client",
            LogStr: "Log msg sent",
        }

        for request_nbr := 1; true; request_nbr++ {

            var network bytes.Buffer
            enc := gob.NewEncoder(&network)
            err := enc.Encode(aLogEntry)
            if err != nil {
                fmt.Println("encode error:", err)
            }

            // Early decode test - this will influence subsequent gob
            // behaviour so leave commented when caring about the sent
            // data
            // dec := gob.NewDecoder(&network)
            // var aLogEntry2 *LogEntry
            // err = dec.Decode(&aLogEntry2)
            // if err != nil {
            //  fmt.Printf("client_task(DECODE ERROR) : %s\n\n", err)
            // }
            // fmt.Printf("client_task(TEST DECODE) %+v\n\n", aLogEntry)

            mu.Lock()
            // Replaced length by bytes Buffer method: 91
            fmt.Printf("client_task(len) : %d\n\n", network.Len())
            fmt.Printf("client_task(network) : %v\n\n", network)

            client.SendMessage(network, 0)
            mu.Unlock()
            time.Sleep(5 * time.Second)
        }
    }()

    // pause to allow server
    for {
        time.Sleep(100 * time.Millisecond)
    }
}

// The server task
// --------------------------------------------------------------
func server_task() {

    frontend, _ := zmq.NewSocket(zmq.ROUTER)
    defer frontend.Close()
    frontend.Bind("tcp://*:5570")

    for {
        msg, _ := frontend.RecvMessage(0)
        // Added error reporting - does not report any error
        // err does never get filled in here, never an error could get reported here
        if err != nil {
            fmt.Printf("RECV ERROR: %s", err)
        }

        // using WriteString to write the content portion of the
        // received message to the bytes.Buffer for gob to process
        var network bytes.Buffer
        dec := gob.NewDecoder(&network)
        network.WriteString(msg[1])

        // Added length of the bytes Buffer: 285
        // Before sending the bytes Buffer is: 91
        // More than just msg[1] is written into the buffer ?
        fmt.Printf("server_task(len): %d\n\n", network.Len())
        fmt.Printf("server_task(msg[1]) : %s\n\n", msg[1])

        var aLogEntry *LogEntry
        err := dec.Decode(&aLogEntry)
        if err != nil {
            fmt.Printf("server_task(DECODE ERROR) : %s\n\n", err)
        }

        fmt.Printf("server_task(aLogEntry) %+v\n\n", aLogEntry)
    }
}

func main() {
    defer fmt.Println("main() done")

    go client_task("1")
    go server_task()

    //  Run for 5 seconds then quit
    time.Sleep(5 * time.Second)
}

The print statements are showing, on the client side:

client_task(network) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

On the server side:

server_task(msg[1]) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

Which seems pretty much the same.

Results are:

server_task(DECODE ERROR) : extra data in buffer
server_task(aLogEntry) <nil>
user3666197
  • 1
  • 6
  • 50
  • 92
mevla
  • 21
  • 2

1 Answers1

0

OBSERVATION

The ZeroMQ native API defines this property :

When receiving messages a ZMQ_ROUTER socket shall prepend a message part containing the routing id of the originating peer to the message before passing it to the application.

SOLUTION

Either may start using the right-enough PUSH/PULL Scalable Formal Archetype, instead of (oversofisticated for your use-case) DEALER/ROUTER,
or
may rely on an assumption your ROUTER-node will never .RecvMessage( 0 ) with other internal multipart-structure but the one and only one, matching the template of
[ <routing_id> | <[network]-payload> [ | ... ] ]
which cannot be robustly warranted, can it?

While not sure, how the pebbe's zmq4 go-wrapper for ZeroMQ attempts or not to implement all the native-API features and/or handles the potential differences in ( automated without any user-level application intervention? ) reading all multi-part components & handling ot-once and/or NULL-terminated strings' handling, that may conflict inside the .Decode()-method.

Last but not least, if your code relies on msg[1] to have a valid & all conventions compliant payload inside, if I am not overlooking some low-level hack, I am not aware of, I see no explicit handling of cases, when the originating side ( the DEALER ) has not delivered any such new message to the consumer-side ( the ROUTER ), yet the .RecvMessage( 0 )-method fills the msg and proceeds ( with empty msg ) towards the .Decode()-method, where it must for obvious reasons fail on empty or ill-formated msg, must it not?

I would definitely start with a PUSH/PULL replacement, it will not inject the prepended, now multi-frame composition on delivery side, with routing_id and related risks.

The non-blocking-mode returns from .RecvMessage()-method shall still collide on filling msg with empty data, if no pending messages wait inside the PULL-RxQueue-buffers, which shall still panic the .Decode()-method.

In case the .RecvMessage( 0 )-method call actually exhibits a blocking-mode receive, there ought be taken more care into the error-state detection and handling, if ZeroMQ is to be fully excluded from the error's root-cause analysis. More self-defensive .setsockopt()-setups ( ZMQ_LINGER and many others ) for all the deployed ZeroMQ resources would also improve the robustness & the level of error-prone-ness, namely for cases when a crashing application may cause any harm in production.

You may try to reproduce the error : here the IDE unfortunately misses the ZeroMQ part.

Golang.org site failed either :

go: finding module for package github.com/pebbe/zmq4
go: downloading github.com/pebbe/zmq4 v1.2.0
go: found github.com/pebbe/zmq4 in github.com/pebbe/zmq4 v1.2.0
# pkg-config --cflags  -- libzmq
pkg-config: exec: "pkg-config": executable file not found in $PATH

Go build failed.
halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    I've written a reply offline, well detailed, and now I see that there's a limit of some 500+ characters for a comment. The joys of a newbie. Please bear with me as I discover how to use SO. I will try to make the comment 'multi-part'. The other option would be to 'answer the question' which is totally not the intent here. – mevla Apr 27 '20 at 15:56
  • Hmmm.. This is so limiting, not being able to reply fully. I'll try telegraph style. Question based on asyncsrv.go which is the equivalent of page 107 of "Code Connected vol. 1", following frame discussion. Left out popping parts of the message Like to use ROUTER/DEALER if possible.to focus on problem. Decoding the data just before sending it works fine, test code added in the question above. fmt.Printf() shows the data to be the same before sending and after reception. Puzzled. – mevla Apr 27 '20 at 16:03
  • Could it be that the fmt.Printf(%v) statement does not print everything and that there's data not shown for instance in the two outputs in the question above ? Yes, RecvMessage(..., 0) operates in blocking mode which currently I find useful in dealing with the gob problem. – mevla Apr 27 '20 at 16:11
  • Yes, RecvMessage(..., 0) operates in blocking mode which currently I find useful in dealing with the gob problem. Crux of problem: two identically-reported data (fmt.Printf()) = problems gob. Various ways of receiving data in zmq ? Observing ZMQ_RCVMORE polly not needed here, would be in full implementation once this gob problem is solved. I can try PUSH/PULL if it helps shedding light, although using ROUTER/DEALER is what I'm looking at functional wise. Thanks again for your reply and insight - much appreciated. – mevla Apr 27 '20 at 16:15
  • It'd be nice if the playground or tio could handle (some) external packages although I guess that by policy they're not too keen in doing so. I've added error reporting to RecvMessage(). – mevla Apr 28 '20 at 14:07