1

I have a resourceId array which I need loop in parallel. And generate URL for each resource and then put inside a map which is key (resourcId) and value is url.

I got below code which does the job but I am not sure if this is the right way to do it. I am using sizedwaitgroup here to parallelize the resourceId list. And also using lock on map while writing the data to it. I am sure this isn't efficient code as using lock and then using sizedwaitgroup will have some performance problem.

What is the best and efficient way to do this? Should I use channels here? I want to control the parallelism on how much I should have instead of running length of resourceId list. If any resourceId url generation fails, I want to log that as an error for that resourceId but do not disrupt other go routine running in parallel to get the url generated for other resourceId.

For example: If there are 10 resources, and 2 fails then log error for those 2 and map should have entry for remaining 8.

// running 20 threads in parallel
swg := sizedwaitgroup.New(20)
var mutex = &sync.Mutex{}
start := time.Now()
m := make(map[string]*customerPbV1.CustomerResponse)
for _, resources := range resourcesList {
  swg.Add()
  go func(resources string) {
    defer swg.Done()
    customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
    if err != nil {
      errs.NewWithCausef(err, "Could not generate the url for %s", resources)
    }
    mutex.Lock()
    m[resources] = customerUrl
    mutex.Unlock()
  }(resources)
}
swg.Wait()

elapsed := time.Since(start)
fmt.Println(elapsed)

Note: Above code will be called at high throughput from multiple reader threads so it needs to perform well.

AndyP
  • 527
  • 1
  • 14
  • 36

3 Answers3

4

I'm not sure what sizedwaitgroup is and it's not explained, but overall this approach doesn't look very typical of Go. For that matter, "best" is a matter of opinion, but the most typical approach in Go would be something along these lines:

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results)
    }
    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}

(Though, based on the name, I would assume errs.NewWithCause doesn't actually handle errors, but returns one, in which case the current code is dropping them on the floor, and a proper solution would have an additional chan error for handling errors:

func main() {
    wg := new(sync.WaitGroup)
    start := time.Now()
    numWorkers := 20
    m := make(map[string]*customerPbV1.CustomerResponse)
    work := make(chan string)
    results := make(chan result)
    errors := make(chan error)
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go worker(work, results, errors)
    }

    go func() {
        for _, resources := range resourcesList {
            work <- resources
        }
        close(work)
    }()

    go func() {
        wg.Wait()
        close(results)
        close(errors)
    }()

    go func() {
        for err := range errors {
            // Do something with err
        }
    }()

    for result := range results {
        m[result.resources] = result.response
    }

    elapsed := time.Since(start)
    fmt.Println(elapsed)
}

type result struct {
    resources string
    response  *customerPbV1.CustomerResponse
}

func worker(ch chan string, r chan result, errs chan error) {
    for w := range ch {
        customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
        if err != nil {
            errs <- errs.NewWithCausef(err, "Could not generate the url for %s", resources)
            continue
        }
        r <- result{w, customerUrl}
    }
}
Adrian
  • 42,911
  • 6
  • 107
  • 99
  • Thanks for your suggestion. I think I got the idea now but slightly confuse on `worker` method. It should take input parameters for `clientId`, `resources` and `appConfig` too right? `clientId` is just int32 declared just before for loop, `resources` part is coming from iterating the `resourcesList` in the for loop and `appConfig` is also declared just before for loop. So I got confuse on that on how to pass `resources` to `worker` method? – AndyP Feb 25 '22 at 00:09
  • or this line `us.GenerateUrl(clientId, resources, appConfig)` should be `us.GenerateUrl(clientId, w, appConfig)` instead? – AndyP Feb 25 '22 at 00:13
  • 1
    `clientId` and `appConfig` maybe, I don't know where they come from. `resources` no, it gets that from the channel, that's the point of the example code. You are correct about the typo in the `GenerateUrl` line, I'll correct it. – Adrian Feb 25 '22 at 15:31
  • Thanks Adrian. yeah I fixed that already but somehow I am noticing there is a bug in your suggestion. I am seeing `us.GenerateUrl` is returning url back but my map doesn't have it at all. Also program just hangs whenever I make a request using your suggestion. It hangs for valid working case and also for cases where there is an error with `Could not generate the url`. Any idea what could be wrong? I tried debugging but couldn't figure it out. – AndyP Feb 26 '22 at 00:19
  • Are you still around to help me out? I am not able to figure out why my request hangs on the console for both valid and in-valid request. For some reason my map isn't being populated at all. And I don't see this `fmt.Println(elapsed)` being printed on the console. I also added few other debug print out and they aren't getting printed out after for loop to populate map. Not sure what's wrong. – AndyP Feb 27 '22 at 20:56
  • @AndyP sounds like you've made some changes and are having a different issue, so I'd recommend opening a new question for your current problem. – Adrian Feb 28 '22 at 16:10
  • I think there is some issue in your suggestion where code is hanging when I try the request. I didn't modify anything in your suggested code. I used as it is and somehow it is hanging for some reason. @Adrian – AndyP Feb 28 '22 at 16:44
  • @AndyP you'll definitely have to modify it, it's just an example. My answer outlines an approach to solving the problem, but you'll need to tailor it to your specific use case. – Adrian Feb 28 '22 at 16:48
  • Understood. got it now. Thanks for your help! – AndyP Feb 28 '22 at 16:55
  • If you're getting hard stops, you likely just need to put the channel filler in it's own goroutine. I'll make another quick edit. – Adrian Feb 28 '22 at 16:57
  • Just saw the edit and I think it makes sense. Do you think we should also do this `close(work)` inside that same go routine too? – AndyP Feb 28 '22 at 17:12
  • Yes, sorry, close should be after all work is published. – Adrian Feb 28 '22 at 18:37
1

I have create example code with comment. please read the comment.

note: query function will sleep in 1 second.

package main

import (
    "errors"
    "fmt"
    "log"
    "math/rand"
    "runtime"
    "strconv"
    "sync"
    "time"
)

type Result struct {
    resource string
    val      int
    err      error
}

/*
CHANGE Result struct to this
result struct will collect all you need to create map
type Result struct {
    resources string
    customerUrl *customerPbV1.CustomerResponse
    err error
}
*/

// const numWorker = 8

func main() {
    now := time.Now()
    rand.Seed(time.Now().UnixNano())
    m := make(map[string]int)
    // m := make(map[string]*customerPbV1.CustomerResponse)                 // CHANGE TO THIS

    numWorker := runtime.NumCPU()
    fmt.Println(numWorker)
    chanResult := make(chan Result)

    go func() {
        for i := 0; i < 20; i++ {
            /*
             customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
             we asume i is resources
             chanResult <- Result {resource: strconv.Itoa(i)}
            */
            chanResult <- Result{ // this will block until chanResult is consume in line 68
                resource: strconv.Itoa(i),
            }
        }
        close(chanResult)
    }()

    var wg sync.WaitGroup
    cr := make(chan Result)
    wg.Add(numWorker)

    go func() {
        wg.Wait()
        close(cr) // NOTE: don't forget to close cr
    }()

    go func() {
        for i := 0; i < numWorker; i++ { // this for loop will run goroutine
            go func(x int) {
                for job := range chanResult { // unblock chan on line 49
                    log.Println("worker", x, "working on", job.resource)
                    x, err := query(job.resource) // TODO: customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
                    cr <- Result{                 // send to channel, will block until it consume. Consume is in MAIN goroutine "line 84"
                        resource: job.resource,
                        val:      x,
                        err:      err,
                    }
                }
                wg.Done()
            }(i)
        }
    }()

    counterTotal := 0
    counterSuccess := 0
    for res := range cr { // will unblock channel in line 71
        if res.err != nil {
            log.Printf("error found %s. stack trace: %s", res.resource, res.err)
        } else {
            m[res.resource] = res.val // NOTE: save to map
            counterSuccess++
        }
        counterTotal++
    }
    log.Printf("%d/%d of total job run", counterSuccess, counterTotal)
    fmt.Println("final :", m)
    fmt.Println("len m", len(m))

    fmt.Println(runtime.NumGoroutine())
    fmt.Println(time.Since(now))
}

func query(s string) (int, error) {
    time.Sleep(time.Second)
    i, err := strconv.Atoi(s)
    if err != nil {
        return 0, err
    }

    if i%3 == 0 {
        return 0, errors.New("i divided by 3")
    }
    ms := i + 500 + rand.Intn(500)
    return ms, nil
}

playground : https://go.dev/play/p/LeyE9n1hh81

Rahmat Fathoni
  • 1,272
  • 1
  • 2
  • 8
  • Thanks Rahmat. I am looking at your play to understand more but by any chance you seeing anything wrong in this [play](play.golang.com/p/PjTx5aKWwM5)? Somehow my request is hanging whenever I make either for valid and in-valid case. – AndyP Feb 28 '22 at 02:07
  • https://go.dev/play/p/X0KMp-n3HrB – Rahmat Fathoni Feb 28 '22 at 02:48
  • Add *wg.WaitGroup in worker. Call `for _, resources := range resourcesList {}` inside goroutine to avoid `close(work)` in main goroutine, so main goroutine will listen results channel – Rahmat Fathoni Feb 28 '22 at 02:52
  • hmm that was a silly mistake I guess. It works now after your change. Can you tell me why we needed to do this in worker method? `defer wg.Done()` If we do this every time in worked method then that means we creating waitgroup for each resource? Just trying to learn. Also which one is better approach, the one you suggested in your answer vs what I have in my play? – AndyP Feb 28 '22 at 04:25
  • `1.` `defer wg.Done()` is to close the channel `results` and `errs` `(line 43-47)`, if this channel is not clossed, it will deadlock in line 56. `2.` We don't create waitgroup on every time work call, because it pass by reference (pointer semantic). It's using same waitgroup. `3.` It's up to you. The different is how to handle error. In my answer it save in struct, In your play it handle in different goroutine (maybe sometimes err handle will run late `if channel results closed first`). – Rahmat Fathoni Feb 28 '22 at 05:00
  • Thanks understood now. Can you explain what does this line `maybe sometimes err handle will run late if channel results closed first` mean? And how to handle that better in my play? – AndyP Feb 28 '22 at 07:01
  • see this image https://ibb.co/Jn6xJgG. But this bug is simple to solve, `just close channel errs first then close channel results` – Rahmat Fathoni Feb 28 '22 at 07:12
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/242473/discussion-between-andyp-and-rahmat-fathoni). – AndyP Feb 28 '22 at 07:13
0

Here is a pure channel solution (playground). I think the performance really depends on the GenerateUrl or in my code generateURL. Also one more thing I would like to point out is that correct term for this is concurrency not parallelism.

package main

import (
    "errors"
    "log"
    "strconv"
    "strings"
)

type result struct {
    resourceID, url string
    err             error
}

func generateURL(resourceID string) (string, error) {
    if strings.HasPrefix(resourceID, "error-") {
        return "", errors.New(resourceID)
    }
    return resourceID, nil
}

func main() {
    // This is the resource IDs
    resources := make([]string, 10000)
    for i := 0; i < 10000; i++ {
        s := strconv.Itoa(i)
        if i%10 == 0 {
            resources[i] = "error-" + s
        } else {
            resources[i] = "id-" + s
        }
    }

    numOfChannel := 20
    // We send result through this channel to the resourceMap
    ch := make(chan result, 10)
    // These are the channels that go routine receives resource ID from
    channels := make([]chan string, numOfChannel)
    // After processing all resources, this channel is used to signal the go routines to exit
    done := make(chan struct{})

    for i := range channels {
        c := make(chan string)
        channels[i] = c

        go func() {
            for {
                select {
                case rid := <-c:
                    u, err := generateURL(rid)
                    ch <- result{rid, u, err}
                case _, ok := <-done:
                    if !ok {
                        break
                    }
                }
            }
        }()
    }

    go func() {
        for i, r := range resources {
            channels[i%numOfChannel] <- r
        }
    }()

    resourceMap := make(map[string]string)
    i := 0
    for p := range ch {
        if p.err != nil {
            log.Println(p.resourceID, p.err)
        } else {
            resourceMap[p.resourceID] = p.url
        }
        i++
        if i == len(resources)-1 {
            break
        }
    }

    close(done)
}
geliba187
  • 357
  • 2
  • 11
  • I have a question [here](https://stackoverflow.com/questions/71916607/load-data-from-reading-files-during-startup-and-then-process-new-files-and-clear) where I am using few channels one for data and one for error but I am confuse on one small thing which is resulting in a bug. I could use some help there. Any help will be greatly appreciated. – dragons Apr 21 '22 at 19:07