I'm trying to use gob with a zmq4
socket (from pebbe's zmq4
). A zmq4
socket does not have an io device making it seemingly impossible for gob to read/write directly:
I cannot use &client ( type **zmq4.Socket )
as type io.Writer
in an argument to gob.NewEncoder
: zmq4.Socket
does not implement an io.Writer
(missing Write method)
One zmq4
send function, the SendMessage()
, accepts an interface{}
, so I'm using that to send.
On the server side zmq4
receive functions are returning either a string
, []byte
, []string
or [][]byte
. I'm using the RecvMessage()
which returns a []string
.
It'd be OK to write to a bytes.Buffer
, send that buffer, read it as a []string
then take the content portion of the message to process with gob. Although the current problem lies in converting that []string
to a bytes.Buffer
for gob to be able to io.Read
from it. Sounds basic, but I've tried many approaches so far w/o success. Here's the current one. The problem is apparently with gob yielding "extra data in buffer" when data seems to be the same before sending and after receiving, as the print
statements are showing.
Is there a simpler, more go-like of going about this? If you have zmq4
, the code below is self-contained and should execute.
package main
import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"time"
zmq "github.com/pebbe/zmq4"
)
type LogEntry struct {
ErrID int
Name string
Level string
LogStr string
}
// The client task
// --------------------------------------------------------------
func client_task(s string) {
var mu sync.Mutex
client, _ := zmq.NewSocket(zmq.DEALER)
defer client.Close()
client.SetIdentity(s)
client.Connect("tcp://localhost:5570")
go func() {
aLogEntry := &LogEntry{
ErrID: 1,
Name: "Client",
LogStr: "Log msg sent",
}
for request_nbr := 1; true; request_nbr++ {
var network bytes.Buffer
enc := gob.NewEncoder(&network)
err := enc.Encode(aLogEntry)
if err != nil {
fmt.Println("encode error:", err)
}
// Early decode test - this will influence subsequent gob
// behaviour so leave commented when caring about the sent
// data
// dec := gob.NewDecoder(&network)
// var aLogEntry2 *LogEntry
// err = dec.Decode(&aLogEntry2)
// if err != nil {
// fmt.Printf("client_task(DECODE ERROR) : %s\n\n", err)
// }
// fmt.Printf("client_task(TEST DECODE) %+v\n\n", aLogEntry)
mu.Lock()
// Replaced length by bytes Buffer method: 91
fmt.Printf("client_task(len) : %d\n\n", network.Len())
fmt.Printf("client_task(network) : %v\n\n", network)
client.SendMessage(network, 0)
mu.Unlock()
time.Sleep(5 * time.Second)
}
}()
// pause to allow server
for {
time.Sleep(100 * time.Millisecond)
}
}
// The server task
// --------------------------------------------------------------
func server_task() {
frontend, _ := zmq.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind("tcp://*:5570")
for {
msg, _ := frontend.RecvMessage(0)
// Added error reporting - does not report any error
// err does never get filled in here, never an error could get reported here
if err != nil {
fmt.Printf("RECV ERROR: %s", err)
}
// using WriteString to write the content portion of the
// received message to the bytes.Buffer for gob to process
var network bytes.Buffer
dec := gob.NewDecoder(&network)
network.WriteString(msg[1])
// Added length of the bytes Buffer: 285
// Before sending the bytes Buffer is: 91
// More than just msg[1] is written into the buffer ?
fmt.Printf("server_task(len): %d\n\n", network.Len())
fmt.Printf("server_task(msg[1]) : %s\n\n", msg[1])
var aLogEntry *LogEntry
err := dec.Decode(&aLogEntry)
if err != nil {
fmt.Printf("server_task(DECODE ERROR) : %s\n\n", err)
}
fmt.Printf("server_task(aLogEntry) %+v\n\n", aLogEntry)
}
}
func main() {
defer fmt.Println("main() done")
go client_task("1")
go server_task()
// Run for 5 seconds then quit
time.Sleep(5 * time.Second)
}
The print statements are showing, on the client side:
client_task(network) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}
On the server side:
server_task(msg[1]) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}
Which seems pretty much the same.
Results are:
server_task(DECODE ERROR) : extra data in buffer
server_task(aLogEntry) <nil>