1

I'm using this: (symbols is []string as well as filteredSymbols)

concurrency := 5
sem := make(chan bool, concurrency)

for i := range symbols {
    sem <- true
    go func(int) {
        defer func() { <-sem }()
        rows, err := stmt.Query(symbols[i])
        if <some condition is true> {
            filteredSymbols = append(filteredSymbols, symbols[i])
        }
    }(i)
}
for i := 0; i < cap(sem); i++ {
    sem <- true
}

to limit number of goroutines running concurrently. I need to limit it because every goroutine interacts with Postgres database and sometimes I do have more than 3000 symbols to evaluate. The code is for analysing big financial data, stocks and other securities. I'm also using same code to get OHLC and pre-calculated data from db. Is this a modern approach for this? I'm asking this because WaitGroups already exist and I'm looking for a way to use those instead.

Also, I observed that my method above sometimes yield different results. I had a code where sometimes the resulting number of filteredSymbols is 1409. Without changing the parameters, it would then yield 1407 results, then 1408 at times. I even had a code where there were big deficit in results.

The code below was very inconsistent so I removed the concurrency. (NOTE that in code below, I don't even have to limit concurrent goroutines since they only use in-memory resources). Removing concurrency fixed it

func getCommonSymbols(symbols1 []string, symbols2 []string) (symbols []string) {
    defer timeTrack(time.Now(), "Get common symbols")
    // concurrency := len(symbols1)
    // sem := make(chan bool, concurrency)

    // for _, s := range symbols1 {
    for _, sym := range symbols1 {
        // sym := s
        // sem <- true
        // go func(string) {
        // defer func() { <-sem }()
        for k := range symbols2 {
            if sym == symbols2[k] {
                symbols = append(symbols, sym)
                break
            }
        }
        // }(sym)
    }
    // for i := 0; i < cap(sem); i++ {
    //  sem <- true
    // }
    return
}
JohnStephen.19
  • 345
  • 7
  • 19
  • 1
    You have a race on updating filteredSymbols, you need a lock or switch to a results channel – superfell Mar 05 '18 at 06:07
  • can you please be more specific/clear with that? I'm very new to programming as a whole. Just one year in the field – JohnStephen.19 Mar 05 '18 at 06:10
  • how can I switch to `results` channel when I need `sem` channel to handle limiting number of goroutines running concurrently? I may add it but can you please specify where the `filteredSymbols = append(filteredSymbols, <-results)` should happen? Also, we need to consider that we don't know how many results will be returned – JohnStephen.19 Mar 05 '18 at 06:18
  • just saw `SizedWaitGroup` now. But still don't know the correct solution to the race in appending to `filteredSymbols` – JohnStephen.19 Mar 05 '18 at 06:22
  • after reading about `MUTEX`, I now know what to do. Still, to anyone, feel free to answer this thread and I will accept the best one :) Sad that I have to change a lot of codes – JohnStephen.19 Mar 05 '18 at 06:48
  • See related [Is this an idiomatic worker thread pool in Go?](https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204) – icza Mar 05 '18 at 07:57
  • Maybe you might take a look at some concurrency tutorials https://talks.golang.org/2012/concurrency.slide#1 and https://blog.golang.org/pipelines. – Rick-777 Mar 05 '18 at 15:24
  • and https://github.com/golang/go/wiki/LearnConcurrency – Rick-777 Mar 05 '18 at 15:25

1 Answers1

0

You have a data race, multiple goroutines are updating filteredSymbols at the same time. The smallest change you can make to fix it is to add a mutex lock around the append call, e.g.

concurrency := 5
sem := make(chan bool, concurrency)
l := sync.Mutex{}
for i := range symbols {
    sem <- true
    go func(int) {
        defer func() { <-sem }()
        rows, err := stmt.Query(symbols[i])
        if <some condition is true> {
            l.Lock()
            filteredSymbols = append(filteredSymbols, symbols[i])
            l.Unlock()
        }
    }(i)
}
for i := 0; i < cap(sem); i++ {
    sem <- true
}

The Race Detector could of helped you spot this as well. One common alternative would be to use a channel to get work into a goroutine, and a channel to get the results out, something like.

concurrency := 5
workCh := make(chan string, concurrency)
resCh := make(chan string, concurrency)
workersWg := sync.WaitGroup{}
// start the required number of workers, use the WaitGroup to see when they're done
for i := 0; i < concurrency; i++ {
   workersWg.Add(1)
   go func() {
     defer workersWg.Done()
     for symbol := range workCh {
          // do some work
          if cond {
              resCh <- symbol
          }
     }
   }()
}
go func() {
    // when all the workers are done, close the resultsCh
    workersWg.Wait()
    close(resCh)
}()
// submit all the work
for _, s := range symbols {
    workCh <- s
}
close(workCh)
// collect up the results 
for r := range resCh {
    filteredSymbols = append(filteredSymbols, r)
}
superfell
  • 18,780
  • 4
  • 59
  • 81