1

I am attempting to use ZeroMQ within ECS running on Fargate in awsvpc mode. I have 2 different services, each running its own task with service discovery enabled.

I create my Router and Dealer in a microservice called broker.

front, _ := zmq.NewSocket(zmq.ROUTER)
defer front.Close()
front.Bind("tcp://*:4070")

back, _ := zmq.NewSocket(zmq.DEALER)
defer back.Close()
back.Bind("tcp://*:4080")

I then add these 2 sockets to a poller and have a for loop that waits for messages.

I have a separate microservice that connects to the socket and attempts to send a message to the dealer. I have set service discovery so I assume the address I connect to would be:

"tcp://broker:4070"

Below is the code from 'serviceA'

func New(ZMQ models.ZMQ) *Requester {
    s, err := zmq.NewSocket(zmq.REQ)
    if err != nil {
        log.Fatalln("shareholder/requester zmq.NewSocket", err)
    }
    p := zmq.NewPoller()
    p.Add(s, zmq.POLLIN)

    log.Println("Requester", ZMQ.Req)
    err = s.Connect("tcp://broker:4070")
    if err != nil {
        log.Print(fmt.Errorf("err is %w", err))
    }

    req := &Requester{
        Poller:  p,
        Retries: 2,
        Socket:  s,
        Timeout: time.Duration(time.Minute),
    }
    runtime.SetFinalizer(req, (*Requester).Close)
    return req
}

I then use the above code to send a message with my socket connection

_, err := r.Socket.SendMessage(req)

However, my message is never received within my broker service. I can hit my REST APIs on the network with their hostnames I register during service discovery, is there something I am missing here with Fargate/ECS/ZeroMQ???

N P
  • 2,319
  • 7
  • 32
  • 54
  • 1
    If you use [`awsvpc`](https://aws.amazon.com/de/blogs/compute/task-networking-in-aws-fargate/) your container behave as if they all would be on `localhost`. So you can call them with `localhost:4070`. – codedge May 02 '20 at 22:49
  • They are 2 different services within ECS, I thought you can only do localhost:4070 if they are on the same task definition in one service? – N P May 03 '20 at 08:47
  • That is correct - only if there are on the same service. If you want to have a communication across different services, service A and B, you have to use [service discovery](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-service-discovery.html). This can be enabled during service creation. – codedge May 03 '20 at 08:53
  • I've enabled service discovery already and no luck! – N P May 03 '20 at 09:07
  • _DNS records for a service discovery service can be queried within your VPC. They use the following format: .._ Did you [verify service discovery works](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/create-service-discovery.html#create-service-discovery-verify)? Calling services with `tcp://broker:4070` across different services is definitely not possible. – codedge May 03 '20 at 09:18
  • I can see in Route53 the DNS record it's added for the broker service. The service discovery works fine for my gRPC and rest services. I can query http://config:50050/ for example. If tcp://broker:4070 wouldn't work across services how would I go about connecting to the tcp socket from a different service? – N P May 03 '20 at 09:23
  • _[...] how would I go about connecting to the tcp socket from a different service? – Nick Pocock 20 mins ago_: You would use service discovery with this format _._ to access your broker in another service. You can check AWS [`allow communication across services`](https://aws.amazon.com/de/premiumsupport/knowledge-center/ecs-tasks-services-communication/) – codedge May 03 '20 at 09:47
  • Using ZeroMQ calling tcp://broker:4070 is a valid way to connect across services. I've enabled service discovery. I think maybe you aren't getting what my issue is – N P May 03 '20 at 10:05

2 Answers2

1

Q : "is there something I am missing here with Fargate/ECS/ZeroMQ???"

Maybe yes, maybe no.

Let's start in a structured way to drill down to a root-cause :

Step 0: a Broker Service Node

ZeroMQ was mentioned to be used, so we'll start from this point. Given your choice was to use an AccessPoint to DEALER on address ( *:4070 ) and an AccessPoint to ROUTER on address ( *:4080 ), and both using a .bind()-method for activating a tcp://-Transport Class inside a Broker-Microservice Node, our next step is to validate, whether and how is this Node actually visible for the rest of the world.

So, let it run.

Step 1: a Line-of-Sight Test

This is a first step to test - is the Broker-Node, whatever is its implementation, actually visible for the "intended audience" ? If not, there is not much to do about it inside ZeroMQ or other frameworks, but your task is to get the addresses, L1-signal interconnection, L2-arp/rarp MAC-detection/mapping, L3-routing permissions/access-lists/filters/xlations/etc, (dynamic) DNS-updates and all other configurations updated, so that you enable the (selective part of the) rest of the world see and get one step closer to do a successful .connect()

$ #                                 is it L3-(in)-visible # a [ PASS ] | [ FAIL ]
$ ping <_a_Broker_Node_Assumed_TCP/IP_Address>            # a [ PASS ] | [ FAIL ]

Step 2: a port-number RTO-Test

$ #                                                  4070 # a [ PASS ] | [ FAIL ]
$ netcat -vz <_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]
$ ######
$ # OR :
$ ######
$ telnet     <_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]
Trying 
Connected to 
Escape character is '^]'.
https://<_a_Broker_Node_visible_TCP/IP_Address>:4070
HTTP/1.1 400 Bad Request
Server: nginx
Date: Mon, 03 May 2020 18:14:54 GMT
Content-Type: text/html
Content-Length: 150
Connection: close

<html>
<head><title>400 Bad Request</title></head>
<body>
<center><h1>400 Bad Request</h1></center>
<hr><center>nginx</center>
</body>
</html>
Connection closed by foreign host.
$
$ //                                             4080 // a [ PASS ] | [ FAIL ]
$ telnet <_a_Broker_Node_visible_TCP/IP_Address> 4080 // a [ PASS ] | [ FAIL ]

Step 3: an RTO-test of a local message sending

Replace the rather complicated realm of the REQ/ROUTER-Scalable Formal Communications Archetype Pattern, and let's test with a straightforward PUSH/PULL-message delivery test, which (for obvious reasons) matches the intended use for sending a message:

package main

import (
    zmq "github.com/pebbe/zmq4"
    "log"
    "fmt"
    "time"
    ...
)

func PushTASK() {

    aCtx, err    := zmq.NewContext()
    if err != nil {
        log.Fatalln( "__NACK: aCtx instantiation failed in zmq.NewContext()",
                      err )
    }

    aPusher, err := aCtx.NewSocket( zmq.PUSH )
    if err != nil {
        log.Fatalln( "__NACK: aPusher instantiation failed in aCtxNewSocket()",
                      err )
    }

    err = aPusher.SetLinger( 0 )
    if err != nil {
        log.Fatalln( "__NACK: aPusher instance failed to .SetLinger()",
                      err  )
    }

    err = aPusher.SetConflate( true )
    if err != nil {
        log.Fatalln( "__NACK: aPusher instance failed to .SetConflate()",
                      err  )
    }

    log.Println( "POSACK: aPusher instantiated and about to .connect( tcp://addr:port#)" )

    err = aPusher.Connect( "tcp://broker:4070" )
    if err != nil {
        log.Print( fmt.Errorf( "__NACK: aPusher failed to .connect(): %w",
                                err )
                   )
    }

    log.Println( "POSACK: aPusher RTO and about to .SendMessage*()-loop" )

    for aPush_NUMBER := 1; aPush_NUMBER < 10000; aPush_NUMBER++ {

        err = aPusher.SendMessageDontwait( aPush_NUMBER )
        if err != nil {
              log.Print( fmt.Errorf( "__NACK: aPusher failed to .SendMessageDontwait()[%d]: %w",
                                      aPush_NUMBER,
                                      err )
                         )
        }

        time.Sleep( 0.1 * time.Second )
    }
 // ---------------------------------------------------BE NICE TO RESOURCES USED
    err = aPusher.Disconnect( "tcp://broker:4070" )
    if err != nil {
        log.Print( fmt.Errorf( "__NACK: aPusher failed to .Disconnect( tcp://addr:port ): %w",
                                err )
                   )
    }
 // ---------------------------------------------------BE NICE TO RESOURCES USED
    err = aPusher.Close()
    if err != nil {
        log.Print( fmt.Errorf( "__NACK: aPusher failed to .Close(): %w",
                                err )
                   )
    }
 // ---------------------------------------------------BE NICE TO RESOURCES USED
    err = aCtx.Term()
    if err != nil {
        log.Print( fmt.Errorf( "__NACK: aCtx failed to .Term(): %w",
                                err )
                   )
    }
 // ---------------------------------------------------WE ARE CLEAR TO TERMINATE
}

Step 4: an RTO-test of a remote message receipt

If none of the [ PASS ] | [ FAIL ]-tests 've crashed, the next step is to reflect the PUSH-side concept for the "remote" Broker, yes, rewriting it to use a PULL-side and deploy it to see, if there are no crashes either and whether the messages arrive as they ought to in the still running or re-run the Step 3.

Step 5: Enjoy the powers of ZeroMQ

Once all the tests above indeed do [ PASS ], you are not only sure the ZeroMQ was not the show-stopper, but also may enhance the deployed principles into any further use-case scenarios, given the L1-/L2-/L3-/ZeroMQ-services were put in place in a correct and verifiable manner.

halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    Brilliant, detailed answer. It turns out it was some failing health checks on my ZeroMQ container which meant my Route 53 DNS records were being removed causing my service discovery to break. Thanks for the time and effort of this post. – N P May 04 '20 at 11:41
-1

I would describe my thinking as an answer and I am sure we can figure your problem out.

So I think this is your setup.

Setup services

Service A

  • nginx calls backend via backend:9000
  • backend calls nginx via nginx:80

Service A to B

  • nginx cannot call broker1 via broker1:4070
  • Neither nginx nor backend cann can call broker1 or broker2 by just specifying name:port.

If the container running in different services and each service has its own awsvpc you cannot call them just by specifying the name:port.

You need to a connection across services, from A to B, which means you need proper Service Discovery.

codedge
  • 4,754
  • 2
  • 22
  • 38
  • So my system is set up like this. I have one VPC. I have then 3 services all within that one VPC. Each service within Fargate has service discovery enabled. The traffic is all between services, nothing from outside my VPC. My broker has a DNS record in route53 which is broker.prod I then call tcp://broker.prod:4070 to connect to it from a different service – N P May 03 '20 at 11:23
  • Gotcha, all services withing _one_ vpc. Do you have as `task-definition.json` you can provide that is executed when deploying? – codedge May 03 '20 at 11:24