2

I'm attempting to read URLs from a queue (RabbitMQ) and make a limited number of concurrent HTTP requests i.e. have a pool of 10 workers making concurrent requests to URLs received from the queue (forever).

So far I've implemented a consumer as per the RabbitMQ tutorials: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

And have tried a number of methods from examples discovered on the web, ending at the example here: http://jmoiron.net/blog/limiting-concurrency-in-go/

Unfortunately, my current code runs for approximately one minute and then freezes indefinitely. I've tried adding/moving go routines around but I can't seem to get it to work as intended (I'm very new to Go).

Current code:

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/Xide/bloom"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

var netClient = &http.Client{
    Timeout: time.Second * 10,
}

func getRequest(url string) {
    //resp, err := http.Get(string(url))
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

func main() {
    bf := bloom.NewDefaultScalable(0.1)

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "urls",            // name
        true,              // durable
        false,             // delete when unused
        false,             // exclusive
        false,             // no-wait
        nil,               // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, //global
    )
    failOnError(err, "Failed to set Qos")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    concurrency := 10
    sem := make(chan bool, concurrency)
    go func() {
        for d := range msgs {
            sem <- true
            url := string(d.Body)
            if bf.Match(url) == false {
                bf.Feed(url)
                log.Printf("Not seen: %s", d.Body)
                go func(url string) {
                    defer func() { <-sem }()
                    getRequest(url)
                }(url)
            } else {
                log.Printf("Already seen: %s", d.Body)
            }
            d.Ack(false)
        }
        for i := 0; i < cap(sem); i++ {
            sem <- true
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
user3104123
  • 41
  • 1
  • 5
  • could you add the log output to the question, this will help people see what is going on – lewis Aug 24 '17 at 10:10
  • Try running the program with `-race` flag, it may help you with debugging: https://blog.golang.org/race-detector – Nebril Aug 24 '17 at 10:14
  • With concurrency set to 10 it makes approx 60 HTTP requests (getting gradually slower) and then freezes. Building with -race doesn't provide any info. – user3104123 Aug 24 '17 at 10:27
  • 1
    [net/http documentation](https://golang.org/pkg/net/http) says "The client must close the response body when finished with it:" and I can't spot in your code, where you close the response body. So I conjecture, that all those connection stay open indefinitly. (But with only 60 calls, that shouldn't already be a problem.) – typetetris Aug 24 '17 at 12:02
  • 1
    If I remember correcty, there are also problems, if you don't read the body of the response fully, but I can't find the documentation pointing to that. But I remember doing things like `io.Copy(resp.Body, ioutil.Discard)` or something. Maybe that was superstition. – typetetris Aug 24 '17 at 12:05
  • @Krom not superstition - if you don't read the entire response body before closing it, the connection will not be reused for subsequent requests to the same host: https://stackoverflow.com/questions/17948827/reusing-http-connections-in-golang – Adrian Aug 24 '17 at 13:01

2 Answers2

3

You're not properly handling your HTTP responses, leading to a growing set of open connections. Try this:

func getRequest(url string) {
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    // Add this bit:
    defer func() {
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
    }()
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

This, after you finish reading messages from the channel, seems unnecessary and potentially problematic:

    for i := 0; i < cap(sem); i++ {
        sem <- true
    }

Why fill the sem channel after you've read all the messages from the queue? You've added exactly as many messages to the channel as you expect to read from it, so this is pointless at best, and could cause problems if you make the wrong change to the rest of the code.

Unrelated to your issue, but this is redundant:

if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
}

Per the documentation, Fatalf already exits, so the panic will never be called. If you want to log and panic, try log.Panicf, which is designed for that purpose.

Adrian
  • 42,911
  • 6
  • 107
  • 99
0

You are adding to sem when you get a message, but only removing from sem when you haven't seen a url.

so, once you've "already seen" 10 urls, your app will lock up. So you need to add <-sem to your else statement that logs "Already seen".

Either way, that's a fairly odd way to do this kind of concurrency. Here's a version that does this in a more idiomatic way on Play.

Note, in this version, we just spawn 10 goroutines that listen to the rabbit channel.

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/Xide/bloom"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

var netClient = &http.Client{
    Timeout: time.Second * 10,
}

func getRequest(url string) {
    //resp, err := http.Get(string(url))
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    resp.Body.Close()
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}
func main() {
    bf := bloom.NewDefaultScalable(0.1)

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "urls", // name
        true,   // durable
        false,  // delete when unused
        false,  // exclusive
        false,  // no-wait
        nil,    // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, //global
    )
    failOnError(err, "Failed to set Qos")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    concurrency := 10
    var wg sync.Waitgroup              // used to coordinate when they are done, ie: if rabbit conn was closed
    for x := 0; x < concurrency; x++ { // spawn 10 goroutines, all reading from the rabbit channel
        wg.Add(1)
        go func() {
            defer wg.Done() // signal that this goroutine is done
            for d := range msgs {
                url := string(d.Body)
                if bf.Match(url) == false {
                    bf.Feed(url)
                    log.Printf("Not seen: %s", d.Body)
                    getRequest(url)
                } else {
                    log.Printf("Already seen: %s", d.Body)
                }
                d.Ack(false)
            }
            log.Println("msgs channel closed")
        }()
    }

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    wg.Wait() // when all goroutine's exit, the app exits
}
t56k
  • 6,769
  • 9
  • 52
  • 115
David Budworth
  • 11,248
  • 1
  • 36
  • 45
  • The above example exits with: `panic: sync: negative WaitGroup counter` @david-budworth – user3104123 Aug 24 '17 at 15:28
  • Updated the example, I neglected to initialize the Waitgroup with the number of workers (concurrency). I can't actually run the app as I don't have whatever submits items, so you may have to adjust a bit. The point was more to show an alternative way and to explain why your solution was hanging. – David Budworth Aug 25 '17 at 04:18
  • `var wg sync.Waitgroup` => `var wg sync.WaitGroup` and add `wg.Add(concurrency)` – Cui Dec 05 '18 at 16:38
  • @Cui yes, that is more direct, to just `wg.Add(concurrency)`, but I generally prefer to have an obvious connection between `wg.Add(1)` and `wg.Done()`. Plus, if I later (incorrectly) change the for loop to be `<= concurrency` the program will still behave correctly, even though it has an off-by-one error in the loop – David Budworth Jan 26 '19 at 15:34