4

I need to fetch responses from multiple go routines and put them into an array. I know that channels could be used for this, however I am not sure how I can make sure that all go routines have finished processing the results. Thus I am using a waitgroup.

Code

func main() {
  log.Info("Collecting ints")
  var results []int32
  for _, broker := range e.BrokersByBrokerID {
      wg.Add(1)
      go getInt32(&wg)
  }
  wg.Wait()
  log.info("Collected")
}

func getInt32(wg *sync.WaitGroup) (int32, error) {
  defer wg.Done()

  // Just to show that this method may just return an error and no int32
  err := broker.Open(config)
  if err != nil && err != sarama.ErrAlreadyConnected {
    return 0, fmt.Errorf("Cannot connect to broker '%v': %s", broker.ID(), err)
  }
  defer broker.Close()

  return 1003, nil
}

My question

How can I put all the response int32 (which may return an error) into my int32 array, making sure that all go routines have finished their processing work and returned either the error or the int?

icza
  • 389,944
  • 63
  • 907
  • 827
kentor
  • 16,553
  • 20
  • 86
  • 144

5 Answers5

11

If you don't process the return values of the function launched as a goroutine, they are discarded. See What happens to return value from goroutine.

You may use a slice to collect the results, where each goroutine could receive the index to put the results to, or alternatively the address of the element. See Can I concurrently write different slice elements. Note that if you use this, the slice must be pre-allocated and only the element belonging to the goroutine may be written, you can't "touch" other elements and you can't append to the slice.

Or you may use a channel, on which the goroutines send values that include the index or ID of the item they processed, so the collecting goroutine can identify or order them. See How to collect values from N goroutines executed in a specific order?

If processing should stop on the first error encountered, see Close multiple goroutine if an error occurs in one in go

Here's an example how it could look like when using a channel. Note that no waitgroup is needed here, because we know that we expect as many values on the channel as many goroutines we launch.

type result struct {
    task int32
    data int32
    err  error
}

func main() {
    tasks := []int32{1, 2, 3, 4}

    ch := make(chan result)

    for _, task := range tasks {
        go calcTask(task, ch)
    }

    // Collect results:
    results := make([]result, len(tasks))

    for i := range results {
        results[i] = <-ch
    }

    fmt.Printf("Results: %+v\n", results)
}

func calcTask(task int32, ch chan<- result) {
    if task > 2 {
        // Simulate failure
        ch <- result{task: task, err: fmt.Errorf("task %v failed", task)}
        return
    }

    // Simulate success
    ch <- result{task: task, data: task * 2, err: nil}
}

Output (try ot on the Go Playground):

Results: [{task:4 data:0 err:0x40e130} {task:1 data:2 err:<nil>} {task:2 data:4 err:<nil>} {task:3 data:0 err:0x40e138}]
icza
  • 389,944
  • 63
  • 907
  • 827
  • 1
    It should be noted that if you are using a `slice`, uncareful `append` to the `slice` would cause data race. Thus I would recommend an `array` in this case. – leaf bebop Feb 05 '19 at 12:45
  • @leafbebop Yes, if using a slice, it must be pre-allocated, and only element at a given index may be written in the goroutine. – icza Feb 05 '19 at 12:46
  • Do you mind editing my given code sample so that it works with your suggested channel approach? I am a bit confused how I'd do so because of the waitgroup. I believe having a code solution would be the easiest to understand as I am not sure if I should use a waitgroup at all in your proposed solution – kentor Feb 05 '19 at 13:02
  • @kentor Waitgroup can be used to wait a set of goroutines to finish their work. If you use a channel and you know how many values you expect on it, you don't even need a WaitGroup, you can just read `n` values from the channel and then you know you're done. – icza Feb 05 '19 at 13:04
  • I believe I don't know how many values I can expect to be returned through the channel because each goroutine may just fail and therefore doesn't return any value – kentor Feb 05 '19 at 13:05
  • 1
    @kentor Goroutines that "fail" should also return a value, so yes, you expect exactly as many values as goroutines you launch. The values sent on the channel should be struct values, wrapping the 2 return values of your current function. So even if an error should be returned, that would still be a value to be sent on the channel. – icza Feb 05 '19 at 13:06
  • I believe the current approach assumes that my getInt32() method always puts just one int32 (instead of an unknown number of int32) into the channel. What would it look like if the function is supposed to send n int32 into the channel? Thanks for all the help! – kentor Feb 05 '19 at 15:40
  • @kentor You could change the `result` type to have a `data` field that is not a single `int32` but a slice, e.g. `[]int32`. – icza Feb 06 '19 at 09:19
6

I also believe you have to use channel, it must be something like this:

package main

import (
    "fmt"
    "log"
    "sync"
)

var (
    BrokersByBrokerID = []int32{1, 2, 3}
)

type result struct {
    data string
    err string // you must use error type here
}

func main()  {
    var wg sync.WaitGroup
    var results []result
    ch := make(chan result)

    for _, broker := range BrokersByBrokerID {
        wg.Add(1)
        go getInt32(ch, &wg, broker)
    }

    go func() {
        for v := range ch {
            results = append(results, v)
        }
    }()

    wg.Wait()
    close(ch)

    log.Printf("collected %v", results)
}

func getInt32(ch chan result, wg *sync.WaitGroup, broker int32) {
    defer wg.Done()

    if broker == 1 {
        ch <- result{err: fmt.Sprintf("error: gor broker 1")}
        return
    }

    ch <- result{data: fmt.Sprintf("broker %d - ok", broker)}
}

Result will look like this:

2019/02/05 15:26:28 collected [{broker 3 - ok } {broker 2 - ok } { error: gor broker 1}]
cn007b
  • 16,596
  • 7
  • 59
  • 74
  • 1
    I didn't think about creating a go routine for consuming the results. Thanks for that! I assume there's no mutex required because just one go routine writes into the array? – kentor Feb 05 '19 at 16:05
  • Aren't there any more higher level ways to do this? – Alper Sep 13 '19 at 09:00
  • 1
    Great response, but be careful with the go routine that appends values from channel to the results variable, since you are not using any sync.WaitGroup on the channel, you ***may*** end up with an incomplete array. To make sure that you got the full array, add another sync.Waitgroup just for the go routine that iterates through the channel variable (closing it properly). – Edenshaw Mar 27 '22 at 14:53
1
package main

import (
    "fmt"
    "log"
    "sync"
)

var (
    BrokersByBrokerID = []int{1, 2, 3, 4}
)

type result struct {
    data string
    err  string // you must use error type here
}

func main() {
    var wg sync.WaitGroup
    var results []int
    ch := make(chan int)
    done := make(chan bool)
    for _, broker := range BrokersByBrokerID {
        wg.Add(1)

        go func(i int) {
            defer wg.Done()
            ch <- i
            if i == 4 {
                done <- true
            }

        }(broker)
    }
L:
    for {
        select {
        case v := <-ch:

            results = append(results, v)
            if len(results) == 4 {
                //<-done
                close(ch)
                break L
                
            }

        case _ = <-done:
            break
        }
    }

    fmt.Println("STOPPED")
    //<-done
    wg.Wait()

    log.Printf("collected %v", results)

}
0

Thank cn007b and Edenshaw. My answer is based on their answers. As Edenshaw commented, need another sync.Waitgroup for goroutine which getting results from channel, or you may get an incomplete array.

package main

import (
    "fmt"
    "sync"

    "encoding/json"
)

type Resp struct {
    id int
}

func main() {
    var wg sync.WaitGroup
    chanRes := make(chan interface{}, 3)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        resp := &Resp{}
        go func(i int, resp *Resp) {
            defer wg.Done()
            resp.id = i
            chanRes <- resp
        }(i, resp)
    }

    res := make([]interface{}, 0)

    var wg2 sync.WaitGroup
    wg2.Add(1)
    go func() {
        defer wg2.Done()
        for v := range chanRes {
            res = append(res, v.(*Resp).id)
        }
    }()

    wg.Wait()
    close(chanRes)
    wg2.Wait()

    resStr, _ := json.Marshal(res)
    fmt.Println(string(resStr))
}
JOE yue
  • 1
  • 2
0
package main

import (
    "fmt"
    "log"
    "sync"
    "time"
)

var (
    BrokersByBrokerID = []int{1, 2, 3, 4}
)

type result struct {
    data string
    err  string // you must use error type here
}

func main() {    
    var wg sync.WaitGroup.   
    var results []int  
    ch := make(chan int)  
    done := make(chan bool) 
    for _, broker := range BrokersByBrokerID {                      
       wg.Add(1)

        go func(i int) {
            defer wg.Done()
            ch <- i
            if i == 4 {
                done <- true
            } 

        }(broker)
    }
    
    for v := range ch {

        results = append(results, v)
        if len(results) == 4 {
            close(ch)
        }

    }

    fmt.Println("STOPPED")
    <-done
    wg.Wait()
    
    log.Printf("collected %v", results)


}


</pre>