4

I've been trying to subscribe to this API feed: https://www.cryptofacilities.com/resources/hc/en-us/articles/360000578294-Fills

Whenever I try to subscribe, it looks like I'm able to send the payload with WRITEJSON, but what I receive from my Read method is

websocket: close 1006 (abnormal closure): unexpected EOF.

The full terminal message is:

2019/08/04 22:20:31 recv: {"event":"info","version":1} 2019/08/04 22:20:31 recv: {"event":"subscribed","feed":"heartbeat"} 2019/08/04 22:20:31 recv: {"event":"challenge","message":"c6e55c07-d07a-4560-9283-be75ee458433"} ^C2019/08/04 22:21:50 interrupt 2019/08/04 22:21:50 write close: write tcp 192.168.1.6:49624->104.16.51.17:443: write: broken pipe

I understand from here that the status code means that my client closed the connection. I'm unable to track down this problem.

I've tried running race detection, turning off firewall (I have a Mac, and tried to turn off the antivirus - found out it's built in. Would this be worth pursuing?) increasing the handshake timeout, handling the error with a close frame, a different OS, increasing the buffer size, and max message size. A lot of it in reference to this post:

https://github.com/gorilla/websocket/issues/321

This is my client:

package websocket

import (
    "crypto/hmac"
    "crypto/sha256"
    "crypto/sha512"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/gorilla/websocket"
)

var addr = "wss://www.cryptofacilities.com/ws/v1"

var Wait = 50000 * time.Second

var MaxMessageSize int64 = 100000

//WebSocket connection struct to pass into other methods
type WebSocket struct {
    Conn *websocket.Conn
}

//Message represents the public server push messages
type Message struct {
    Event   string
    Feed    string
    Message interface{}
}

type ChallengeSub struct {
    Event   string `json:"event"`
    Message string `json:"message"`
}

type HeartBeat struct {
    Event   string
    Message string
}

type FillSubscribe struct {
    Event             string `json:"event"`
    Feed              string `json:"feed"`
    APIKey            string `json:"api_key"`
    OriginalChallenge string `json:"original_challenge"`
    SignedChallenge   string `json:"signed_challenge"`
}

//OpenWebSocket Connects to kraken API and returns a connection
func OpenWebSocket() (*WebSocket, error) {
    conn, response, err := websocket.DefaultDialer.Dial(addr, nil)
    if err != nil {
        return nil, err
    }
    if response.StatusCode != http.StatusSwitchingProtocols {
        return nil, fmt.Errorf("failed to upgrade protocol to websocket")
    }
    return &WebSocket{Conn: conn}, nil
}

//HeartBeat subscribes to the pingpong feed to keep the connection alive
func (c *WebSocket) HeartBeat() error {

    ping := map[string]interface{}{
        "event": "subscribe",
        "feed":  "heartbeat",
    }
    c.Conn.SetWriteDeadline(time.Now().Add(Wait))
    err := c.Conn.WriteJSON(ping)

    return err
}

func (c *WebSocket) Next() ([]byte, error) {
    _, payload, err := c.Conn.ReadMessage()
    return payload, err
}

//Challenge requests the UUID from kraken API for auth handshake
func (c *WebSocket) Challenge() error {

    challengeRequest := map[string]interface{}{
        "event":   "challenge",
        "api_key": "rhsqfT66dxTF7g2O7/t5Cluubjw4MlEz1UoBrZBjf8JocQ/q49j9rH9m",
    }
    c.Conn.SetWriteDeadline(time.Now().Add(Wait))
    err := c.Conn.WriteJSON(challengeRequest)
    if err != nil {
        log.Println("write:", err)

        return err
    }

    return err
}

func (c *WebSocket) Decode(payload []byte) (string, string) { //Decode takes in a connection and reference to Message struct
    var msg json.RawMessage
    env := Message{
        Message: &msg,
    }
    if err := json.Unmarshal([]byte(payload), &env); err != nil {
        log.Fatal(err)
    }
    switch env.Event {

    case "challenge":
        var s ChallengeSub
        if err := json.Unmarshal(msg, &s.Message); err != nil {
            log.Fatal(err)
        }
        message, signed := c.Signature(s.Message)
        c.FillSubscribe(message, signed)

        return message, signed

    case "info":
        {
            fmt.Println("Connected:", env.Event)
        }
    case "subscribed":
        {
            fmt.Println("Connected to Heartbeat")
        }
    default:
        switch env.Feed {
        case "heartbeat":
            fmt.Println("Live")
        }
    }
    return "No messages to Decode...", ""
}

func (c *WebSocket) Signature(message string) (string, string) {

    secret64, _ := base64.StdEncoding.DecodeString("rhsqfT66dxTF7g2O7/t5Cluubjw4MlEz1UoBrZBjf8JocQ/q49j9rH9m")

    hash := sha256.New()
    hash.Write([]byte(message))

    challenge256 := hash.Sum(nil)

    hmacHash := hmac.New(sha512.New, secret64)
    hmacHash.Write(challenge256)

    secretChallenge := hmacHash.Sum(nil)

    signed := base64.StdEncoding.EncodeToString(secretChallenge)
    return message, signed
}

//FillSubscribe populates message struct and sends out the JSON message
func (c *WebSocket) FillSubscribe(challenge string, signed string) error {

    fillMessage := map[string]interface{}{
        "event":              "subscribe",
        "feed":               "fills",
        "api_key":            "rhsqfT66dxTF7g2O7/t5Cluubjw4MlEz1UoBrZBjf8JocQ/q49j9rH9m",
        "original_challenge": challenge,
        "signed_challenge":   signed,
    }
    c.Conn.SetWriteDeadline(time.Now().Add(Wait))
    err := c.Conn.WriteJSON(fillMessage)
    if err != nil {
        log.Println("write:", err)
        return err
    }

    return err
}

Here is my main program:

package main

import (
    "fmt"
    "log"

    "github.com/Mjavala/KrakenAPI/websocket"
)

var message string
var signed string

func main() {

    ws, err := websocket.OpenWebSocket()
    if err != nil {
        log.Fatal(err)
    }

    ws.HeartBeat()
    ws.Challenge()

    fmt.Println(message, signed)

    for {
        // We first read in a raw message. An error here is a socket level
        // error.
        payload, err := ws.Next()
        if err != nil {
            log.Fatalf("socket error: %+v\n", err)
        }

        log.Printf("recv: %s", payload)

        message, signed = ws.Decode(payload)
        if err != nil {
            log.Fatalf("decode error: %+v\n", err)
        }
    }
}

I believe that even if the API Keys are wrong a return message should still be sent, as per the API; but instead I get that 1006 close frame.

I'm new to WebSockets. I'm able to get the challenge messaged and heartbeat subscription from the API, the problem is specific to the fill subscription.

Also if anyone wants to replicate this, I can give the full code/Git link.

MjBVala
  • 137
  • 3
  • 9
  • Thanks for the feedback! I've updated the question with the code for Heartbeat, Decode, and Close; and moved the Read deadline within the for loop. The close listens for an interrupt signal so it's not closing out at that point. I've also updated with the full output I get. Thanks for the info on the mutex, I'll have to figure out a way to incorporate a single mutex for every writer! – MjBVala Aug 03 '19 at 20:26
  • Thanks for the help, I've updated the code to include everything and added the mutex websocket field! Unfortunately the 1006 error still persists. – MjBVala Aug 05 '19 at 01:56
  • my overall sequence would be send challenge -> read challenge response -> compute sig -> send subscription -> start heartbeat -> loop(read message -> if x message comes back send y message -> read message). So after I'm able to subscribe to fills I'll need to be able to read incoming messages and write messages if a condition is met. Would the approach be to rewrite in a way that makes more sense in the main function - encompassing the goroutines into the Websocket wrapper, or are you saying to call a read/write at every step? thankful you're taking the time to point out flaws in my code. – MjBVala Aug 05 '19 at 04:47
  • I've deleted the interrupt handler and modified the main goroutine - the code has been updated. When I run the program now, the program doesn't run the signature or FillSubscribe methods. Could this be because of the UUID channel? I'm thinking I'll decode within the challenge function as a workaround. – MjBVala Aug 05 '19 at 06:09
  • I've updated the code, most of my logic for incoming messages is in the form of switch statements in the Decode function. As you said, the organization isn't the issue. – MjBVala Aug 05 '19 at 22:14
  • 1
    Close code 1006 indicates that server closed the connection, not the client as stated in the question. –  Oct 20 '21 at 04:18
  • 1006 means that the server actively closed the connection. According to https://support.cryptofacilities.com/hc/en-us/articles/360000676773-Subscriptions-Web-Sockets-API- your code have to be sending a ping request at least every 60 seconds. My guess would be if the server are not observing a ping request in a 60 seconds window they would actively close the connection to preserve resource usage. Try adding that to your implementation. Sidenotes, your code seems to have a legit API key to the site. Be careful not to leak those (you might already done so) and if you did, reset it ASAP. – imgg Jul 09 '22 at 05:36

0 Answers0