I created the following simple program to test the fan-in-fan-out pattern using channel. What it does is generate a few go routines to calculate the square of a number coming from an input channel and send the square into an output channel. All output channels will then be merged into a single channel to print the square in main
.
func calculateSquare(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
fmt.Printf("Receving num %v\n", num)
out <- num * num
fmt.Printf("Sending square %v\n", num * num)
}
fmt.Println("Closing out")
close(out)
}()
return out
}
func fanOut(in <-chan int, workerCount int) []<-chan int {
outs := make([]<-chan int, 0, workerCount)
for i := 0 ; i < workerCount ; i++ {
outs = append(outs, calculateSquare(in))
}
return outs
}
func fanIn(outs []<-chan int) <-chan int {
var wg sync.WaitGroup
merge := make(chan int)
for _, out := range outs {
wg.Add(1)
go func() {
for result := range out {
merge <- result
}
wg.Done()
}()
}
go func() {
wg.Wait()
fmt.Println("Closing merge")
close(merge)
}()
return merge
}
func main() {
in := make(chan int)
go func() {
for i := 0 ; i < 4 ; i++ {
fmt.Printf("Sending num %v\n", i)
in <- i
}
close(in)
}()
outs := fanOut(in, 5)
merge := fanIn(outs)
for num := range merge {
fmt.Printf("Final square %v\n", num)
}
}
In the main
function, I'm sending in 4 numbers 0 -> 3 into the input channel and I expect to see 4 square printed in the console. However, when I ran the program, even though the output fluctuates a bit but I never ever see 4 square numbers printed in the console.
Below is a sample output I'm seeing.
Sending num 0
Sending num 1
Sending num 2
Sending num 3
Closing out
Receving num 0
Receving num 1
Receving num 2
Sending square 4
Closing out
Receving num 3
Final square 4
Closing merge
I'd be very grateful if someone could explain to me why Receving num 1
was printed but Sending square 1
is never coming. In addition, if Sending square 1
is not printed, how did the output
channel get closed. I'm only seeing 2 Closing out
, yet, the wait group where I was merging the result ended its Wait()
.
I must have done something wrong somewhere.