3

The idea is to exit outerloop from within go routine, I have used a channel to signal to break the loop. And I am using semaphore pattern to limit the number of goroutines spawned so that , I do not spawn enormously high number of go routines while waiting for loop to exits.

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "sync"
)

type Task struct {
    ID        int    `json:"id"`
    UserID    int    `json:"user_id"`
    Title     string `json:"title"`
    Completed bool   `json:"completed"`
}

func main() {
    var t Task
    wg := &sync.WaitGroup{}
    stop := make(chan struct{})
    sem := make(chan struct{}, 10)

    results := make(chan Task, 1)

    worker := func(i int) {
        defer wg.Done()
        defer func() { <-sem }()
        res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
        if err != nil {
            log.Fatal(err)
        }
        defer res.Body.Close()
        if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
            log.Fatal(err)
        }

        if i == 20 {
            close(stop)
        }
        results <- t
    }

    i := 0

outer:
    for {
        select {
        case <-stop:
            fmt.Println("I came here")
            close(sem)
            break outer
        case v := <-results:
            fmt.Println(v)
        default:
            wg.Add(1)
            sem <- struct{}{}
            go worker(i)
            i++
        }
    }
    wg.Wait()

    fmt.Println("I am done")
}

problem right now is , i see that it enters the case where i am trying to break the loop however it never reaches to I am done the reason probably is that its getting infinitely blocked when trying to receive on results. I would like to know how i can handle the same, effectively.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "sync"
)

type Task struct {
    ID        int    `json:"id"`
    UserID    int    `json:"user_id"`
    Title     string `json:"title"`
    Completed bool   `json:"completed"`
}

func main() {
    wg := &sync.WaitGroup{}

    sem := make(chan struct{}, 10)
    ctx, cancel := context.WithCancel(context.Background())
    var ts []Task
    //results := make(chan Task, 1)

    worker := func(i int) {
        var t Task
        defer wg.Done()
        defer func() {
            <-sem
        }()
        res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
        if err != nil {
            log.Fatal(err)
        }
        defer res.Body.Close()
        if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
            log.Fatal(err)
        }

        if i > 20 {
            cancel()
        }
        ts = append(ts, t)
    }

    i := 0

outer:
    for {
        select {
        case <-ctx.Done():
            break outer
        default:
            wg.Add(1)
            sem <- struct{}{}
            go worker(i)
            i++
        }
    }
    wg.Wait()

    fmt.Println(ts)
}

This works but then i end up getting duplicate entries within the array which I want to avoid.

edit:: @Davud solution works however, I am still interested to know to further optimize and limit number of goroutines spawned. currently extra goroutines spawned=buffersize of sem. Which i some how want to reduced while still keeping it concurrent.

Gopherine
  • 1,021
  • 12
  • 24
  • You stop receiving from the result channel, so any goroutines except one that are still in flight cannot send and thus never call wg.Done(). Drain the result channel in a goroutine just before calling Wait. – Peter Jul 29 '22 at 08:44
  • Yes @Peter thanks for the response exactly that is why i tried second approach but then i am ending up with two same entries in ts array. Is there a way to handle that ? – Gopherine Jul 29 '22 at 09:01
  • You call `cancel()` when `i == 20`, but the launched goroutines run concurrently, so this is "completely" non-deterministic. Instead you should use a worker pool to execute the tasks. See [Is this an idiomatic worker thread pool in Go?](https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204) – icza Aug 01 '22 at 14:10
  • Select for multiple channels, one of which is for exit. Send to the exit channel from another goroutine. – Eric Aug 04 '22 at 14:28

2 Answers2

3

it happens because once the stop signal is received and it exits from the for loop, you are no longer listening and printing the results, and this causes the result channel to block the worker to continue processing.

As a solution, you can listen to the results channel in a separate goroutine.

Here I removed the case v := <-results: fmt.Println(v) and added a goroutine. try it out

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "sync"
)

type Task struct {
    ID        int    `json:"id"`
    UserID    int    `json:"user_id"`
    Title     string `json:"title"`
    Completed bool   `json:"completed"`
}

func main() {
    var t Task
    wg := &sync.WaitGroup{}
    stop := make(chan struct{})
    sem := make(chan struct{}, 10)

    results := make(chan Task, 1)

    worker := func(i int) {
        defer wg.Done()
        defer func() { <-sem }()
        res, err := http.Get(fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i))
        if err != nil {
            log.Fatal(err)
        }
        defer res.Body.Close()
        if err := json.NewDecoder(res.Body).Decode(&t); err != nil {
            log.Fatal(err)
        }

        if i == 20 {
            close(stop)
        }
        results <- t
    }

    i := 0

    go func() {
        for v := range results {
            fmt.Println(v)
        }
    }()
outer:
    for {
        select {
        case <-stop:
            fmt.Println("I came here")
            close(sem)
            break outer
        default:
            wg.Add(1)
            sem <- struct{}{}
            go worker(i)
            i++
        }
    }
    wg.Wait()

    fmt.Println("I am done")
}
Davud Safarov
  • 498
  • 4
  • 12
  • It would be nice if you can add a solution with may be workerpool as well because this still calls more calls then i need like now it makes 30 calls is there a way to limit it while still maintaining concurrency ? I tried but i was not able to properly implement workerpool pattern here. Thanks though – Gopherine Aug 05 '22 at 07:17
0

It seems, that the problem in the second solution is, that the worker share var t Task. That means multiple workers try to assign a value to it, but since it can only hold one value, the worker overwrite the values of each other before append(ts, t) is called. If append is finally called by different workers the last value assigned to t is appended multiple times to ts. The workers call append while t doesn't hold their value anymore, hence the duplicates. It's a data race/race condition.

Solution: Move var t Task inside the worker so that it's not shared anymore.

volkit
  • 1,173
  • 14
  • 21