2

I have some hard times understanding goroutines, channels and all sync thing. I believe I understand the concepts, but I'm missing a couple of lines to connect all the information I have. Also, the majority of the examples feel too simple, so I can't get the proper grasp of what's actually happening.

I'm writing a simple analytics tool for websites. One of the features is to check whether all links on this website are reachable. Obviously, there are many links on each website, so it seems like a great goroutines candidate. The thing is that after scheduling all goroutines I'd need to get back all results in order to present them to the user all at once.

What I have so far is:

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}

    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        go func() {
            wg.Add(1)
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)
            var internal bool

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }

            links = append(links, models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            })

            wg.Done()
        }()
    })

    wg.Wait()

    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}

My code seems to work, but I feel like I miss something (or at least that it could be better). I have a couple of concerns/questions:

  1. If the website would contain 1000 links I'd produce 1000 goroutines and I believe it's not so smart. Probably I'd need a worker pool or something similar, right?
  2. Is it possible to use channels only for this example? I don't know how many links goquery would find, so I can't easily range over elements sent to channel. Also, I can't easily send some done message to another channel, because I don't know when this Each will end. Every for range on this channel is blocking, so the app is being back to synchronous.
  3. I believe this is a common thing in applications that you start iterating over something and you'd want to do some async work on each iteration and gather all results when it's over. I can't grasp my mind around this concept. I can't come up with how to approach such a case.
cojoj
  • 6,405
  • 4
  • 30
  • 52
  • 1
    Use a worker pool: [Is this an idiomatic worker thread pool in Go?](https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204). Although 1000 goroutines isn't particularly many, you could also write results in a slice: [Can I concurrently write different slice elements](https://stackoverflow.com/questions/49879322/can-i-concurrently-write-different-slice-elements/49879469#49879469) – icza Feb 05 '21 at 14:45
  • 2
    You cannot call `wg.Add(1)` from within the goroutine. – JimB Feb 05 '21 at 14:51
  • I'm not familiar with the api you're showing here, but I assume the document is already fetched and you are simply searching the elements? In this example the concurrency overhead is probably far greater than the time it takes to process each value. – JimB Feb 05 '21 at 14:55
  • 1
    You are appending to a slice inside of a go routine, slices are not thread safe. – Steven Masley Feb 05 '21 at 14:56
  • slices are not special, no values are safe for concurrent reads and writes. – JimB Feb 05 '21 at 15:06
  • @JimB Document is fetched, but I still need to check each element (make an HTTP request), so it is time-consuming. – cojoj Feb 05 '21 at 15:08
  • 1
    @cojoj: ah yes, something like a network request would make this reasonable, but 1000 concurrent network requests is probably not the most efficient use of the network resources. Look at all the examples of worker pools for this (or use a semaphore). – JimB Feb 05 '21 at 15:12
  • 1
    @JimB you can definitely call `wg.Add(1)` within a goroutine. it is threadsafe. I think you just want to avoid calling it within the goroutine because the first `wg.Add(1)` might be called after `wg.Wait()`. – maxm Feb 05 '21 at 16:08
  • @maxm, well yes, you don't call it inside the goroutine because it doesn't work, not because it isn't safe to do so. – JimB Feb 05 '21 at 16:21
  • ah, true, understood – maxm Feb 05 '21 at 16:25

1 Answers1

2

You could use a semaphore to constrain the concurrency. This still spawns "1000 goroutines", but ensures that only 5 http requests are in flight at a given time. You can change the value of maxParallel to increase or decrease the number of parallel requests.

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}
    linkChan := make(chan models.Link)
    doneChan := make(chan struct{})
    maxParallel := 5
    semaphore := make(chan struct{}, maxParallel)
    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        wg.Add(1)
        go func() {
            semaphore <- struct{}{}
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }
            linkChan <- models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            }
            wg.Done()
            <-semaphore
        }()
    })
    go func() {
        wg.Wait()
        doneChan <- struct{}{}
    }()

    // Drain the channel
    for {
        select {
        case l := <-linkChan:
            links = append(links, l)
        case <-doneChan:
            return
        }
    }
    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}
maxm
  • 3,412
  • 1
  • 19
  • 27
  • It looks really nice, but this function never returns – cojoj Feb 05 '21 at 16:34
  • I've changed `break` to `return` in your sample and it seems to work as expected. – cojoj Feb 05 '21 at 16:37
  • @cojoj I made an edit after first posting, are you running the version with doneChan? – maxm Feb 05 '21 at 16:37
  • @cojoj huh, strange – maxm Feb 05 '21 at 16:37
  • 1
    oh, that's right, it's just breaking out of the select, I'll update it to be `return` – maxm Feb 05 '21 at 16:38
  • Also, this last return won't be needed as it'll never be reached. – cojoj Feb 05 '21 at 16:38
  • Also one more question. Why did you put `wg.Wait()` inside another goroutine? Is there any specific reason for doing that? – cojoj Feb 05 '21 at 16:40
  • You need three goroutines, and one of the goroutines is the current function's execution. You could put the aggregation in a separate goroutine instead and have the `wg.Wait()` in the function. This method (the answerer's) is better I think because you don't have to send the results over a channel at the end. – Tyler Kropp Feb 05 '21 at 16:50
  • 1
    There are a few ways you could do this, but linkChan is not buffered (it is channel of size 0), so if you don't put the wg.Wait( ) in a goroutine and start draining the linkChan in the select the execution will simply block at `linkChan <- models.Link` – maxm Feb 05 '21 at 17:58