0

I created the following simple program to test the fan-in-fan-out pattern using channel. What it does is generate a few go routines to calculate the square of a number coming from an input channel and send the square into an output channel. All output channels will then be merged into a single channel to print the square in main.

func calculateSquare(in <-chan int) <-chan int {
    out := make(chan int)

    go func() {
       for num := range in {
           fmt.Printf("Receving num %v\n", num)
           out <- num * num
           fmt.Printf("Sending square %v\n", num * num)
       }
       fmt.Println("Closing out")
       close(out)
    }()

    return out
}

func fanOut(in <-chan int, workerCount int) []<-chan int {
    outs := make([]<-chan int, 0, workerCount)

    for i := 0 ; i < workerCount ; i++ {
        outs = append(outs, calculateSquare(in))
    }

    return outs
}

func fanIn(outs []<-chan int) <-chan int {
    var wg sync.WaitGroup

    merge := make(chan int)

    for _, out := range outs {
        wg.Add(1)

        go func() {
            for result := range out {
                merge <- result
            }

            wg.Done()
        }()
    }

    go func() {
        wg.Wait()
        fmt.Println("Closing merge")
        close(merge)
    }()

    return merge
}

func main() {
    in := make(chan int)

    go func() {
        for i := 0 ; i < 4 ; i++ {
            fmt.Printf("Sending num %v\n", i)
            in <- i
        }
        close(in)
    }()

    outs := fanOut(in, 5)
    merge := fanIn(outs)

    for num := range merge {
        fmt.Printf("Final square %v\n", num)
    }
}

In the main function, I'm sending in 4 numbers 0 -> 3 into the input channel and I expect to see 4 square printed in the console. However, when I ran the program, even though the output fluctuates a bit but I never ever see 4 square numbers printed in the console.

Below is a sample output I'm seeing.

Sending num 0
Sending num 1
Sending num 2
Sending num 3
Closing out
Receving num 0
Receving num 1
Receving num 2
Sending square  4
Closing out
Receving num 3
Final square 4
Closing merge

I'd be very grateful if someone could explain to me why Receving num 1 was printed but Sending square 1 is never coming. In addition, if Sending square 1 is not printed, how did the output channel get closed. I'm only seeing 2 Closing out, yet, the wait group where I was merging the result ended its Wait().

I must have done something wrong somewhere.

Mr.J4mes
  • 9,168
  • 9
  • 48
  • 90
  • 5
    run `go vet` (or run any test, which in turn will run `go vet`), and you will see the problem: https://golang.org/doc/faq#closures_and_goroutines – JimB Aug 06 '21 at 14:00
  • @JimB thank you for the pointer. I read about this common mistake a few days back yet I still make it. What a newbie mistake :D – Mr.J4mes Aug 06 '21 at 14:11

2 Answers2

3

To fix:

for _, out := range outs {
    wg.Add(1)

    out := out // <- add this

Why?

https://golang.org/doc/effective_go is an excellent resource and covers the exact closure bug (that @JimB mentioned) towards the end of the channels section:

It may seem odd to write

req := req

but it's legal and idiomatic in Go to do this. You get a fresh version of the variable with the same name, deliberately shadowing the loop variable locally but unique to each goroutine.

colm.anseo
  • 19,337
  • 4
  • 43
  • 52
2

your issue is in the code below, for loop in fanIn function.

    for _, out := range outs {
        wg.Add(1)

        go func() {
            for result := range out {
                merge <- result
            }

            wg.Done()
        }()
    }

Reason for this is you using out iterator variable in gofunc, when gofunc going to use it, loop is gone to it's end.

This is describe in go/wiki/CommonMistakes under the sub topic Using goroutines on loop iterator variables

For more example - read this

corrected loop should be as below,

    for _, out := range outs {
        wg.Add(1)

        go func(c <- chan int) {
            for result := range c {
                merge <- result
            }

            wg.Done()
        }(out)
    }
nipuna
  • 3,697
  • 11
  • 24