4

I've been trying to solve this simple problem I encountered in Golang concurrency. I've been searching all possible solutions, but found nothing specific to my problem(or I might be missed one). Here's my code:

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2)
    go producer(ch, 200*time.Millisecond, 5)

    for {
        fmt.Println(<-ch)    
    }

    close(ch)
}

It prints error:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]: main.main() D:/Code/go/src/testconcurrency/main.go:23 +0xca exit status 2

What is the efficient way to avoid this error?, Thank you.

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
Jake Muller
  • 925
  • 4
  • 18
  • 25
  • you can go through this [question](https://stackoverflow.com/questions/44487887/go-lang-multiplexing-all-goroutines-are-asleep-deadlock) – Himanshu Feb 19 '18 at 11:50
  • 1
    Your problem is that you "post" integers in the channel, then you are trying to read them from the main thread. Then the stream of integers stops, when the goroutines end. (when both `producer`s end, no more integers are posted into the channel) and you still wait on the main thread for anymore integer to be posted, which will never happen. Try closing the channels using `close(ch)` after they are no more used. – user9335240 Feb 19 '18 at 11:57
  • I did the `close(ch)` – Jake Muller Feb 19 '18 at 12:07
  • 1
    You closed it **after** the place that makes a deadlock. You "in the infinite for loop", are keeping polling the channel, waiting for any integer posted to it. After the both producers end, you are still waiting for more, while nothing is produced. The `close(ch)` must be in the end of the producer (but in your case, it will be difficult because you have 2 producers). – user9335240 Feb 19 '18 at 12:13
  • 1
    Try at first, to delete one producer, then run the code, the same crash will happen. Then try to move `close(ch)` to the end of the producer routine, will succeed. But in the 2 producers case, you may need another channel or something, you need a better design. – user9335240 Feb 19 '18 at 12:15

5 Answers5

8

You have producers which are "short-lived", they only send values on the channel for a finite amount of time, and you have an endless for loop which receives values from the channel endlessly, without a termination condition, and the channel is only closed after this endless loop. Once the producers stop sending values, it's a deadlock.

Channels must be closed by the producer(s), signalling that no more values will be sent on it. Since you have multiple producers without synchronization (producers are not synchronized with each other), in general you can't tell which one will finish first, so you can't designate one to close the channel (and a channel can only be closed once, see Why Go's channel can close twice?; and Closing channel of unknown length).

You have to "coordinate" the producers, and when all have finished their jobs, the coordinator should close the channel.

And the consumer should use a for range on the channel, as the for range construct receives all values from the channel that were sent on it before it was closed, then it terminates automatically.

For the coordination it is recommended to use sync.WaitGroup. Whether you use a global one in this case or a local one and you pass it to producers is up to you. Using a local will make the solution more general and easier to extend. One thing to note is that you must pass a pointer to sync.WaitGroup. Whenever you spin up a new producer, increment the waitgroup using WaitGroup.Add(). When a producer is done, it can signal this using WaitGroup.Done(), preferably using defer (so it runs no matter what, mitigating the deadlock in case of abnormal circumstances). And the controller can wait for all producers to finish using WaitGroup.Wait().

Here's a complete solution:

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()

    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int)

    wg.Add(1)
    go producer(ch, 100*time.Millisecond, 2, wg)
    wg.Add(1)
    go producer(ch, 200*time.Millisecond, 5, wg)

    go func() {
        wg.Wait()
        close(ch)
    }()

    for v := range ch {
        fmt.Println(v)
    }
}

Output (try it on the Go Playground):

0
0
1
1
2
3
4

See related question: Prevent the main() function from terminating before goroutines finish in Golang

icza
  • 389,944
  • 63
  • 907
  • 827
6

This problem can be solved in an elegant way using two wait groups. By closing channel ch we signal to the consumers that there is no more data.

The solutions scales well with more consumers.

package main

import (
    "fmt"
    "sync"
    "time"
)

func producer(ch chan<- int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < num; i++ {
        ch <- i
        time.Sleep(d)
    }
}

func consumer(ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for x := range ch {
        fmt.Println(x)
    }
}

func main() {
    ch := make(chan int)
    producers := &sync.WaitGroup{}
    consumers := &sync.WaitGroup{}

    producers.Add(2)
    go producer(ch, 100*time.Millisecond, 2, producers)
    go producer(ch, 200*time.Millisecond, 5, producers)

    consumers.Add(1)
    go consumer(ch, consumers)

    producers.Wait()
    close(ch)
    consumers.Wait()
}
Grzegorz Żur
  • 47,257
  • 14
  • 109
  • 105
3

The problem is that <-ch is blocking, so if you don't add any new values to the channel it will block forever. One way is to replace it with a switch select which is also blocking but allows to listen on multiple channels. You would also have to add an exit channel. In your example, as soon as the exit channel received two values we can break. The break statement needs a label because we wanna exit from the switch and the for loop.

https://play.golang.org/p/wGdCulZDnrx

Another way is to have multiple input channels and close them as soon as they are finished sending. For this, each goroutine needs it's own channel, otherwise we will exit when the first goroutine is finished.

A third option is to create a merge function which merges multiple channels into one. This allows for moving the creation of the channels into the producers, so they are created, filled and closed in one location. The merge function is relatively complex but it's removed from the business logic code and can separately be understood and tested. the main code is then reduced to just:

ch1 := producer(100*time.Millisecond, 2)
ch2 := producer(200*time.Millisecond, 5)

for i := range merge(ch1, ch2) {
    fmt.Println(i)
}

https://play.golang.org/p/2mv8ILhJPIB

merge func is from https://blog.golang.org/pipelines

Christian
  • 3,551
  • 1
  • 28
  • 24
  • I prefer the `merge` function. The blog you provided is exactly explaining about `fan-in` and `fan-out` and how to handle multiple goroutines. Thanks btw. – Jake Muller Feb 19 '18 at 18:00
3

You need to synchronize all the asynchronous process in your goroutines. Your main thread and the goroutine threads are not synchronous process. Your main thread will never knew when to stop invoking channel from goroutines. Since your main thread loop over the channel, it always invoke the value from channel, and when the goroutines finished and the channel stop sending value, your main thread cannot get anymore value from the channel, hence the condition become deadlock. To avoid this use sync.WaitGroup to synchronize the asynchronous process.

Here's the code:

package main

import (
    "fmt"
    "time"
    "sync"
)

func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
    defer wg.Done();
}

func main() {
    wg  := &sync.WaitGroup{}
    ch  := make(chan int);

    wg.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wg);
    go producer(ch, 200*time.Millisecond, 5, wg);

    go func() {   
    wg.Wait()
    close(ch)
    }()

    // print the outputs
    for i:= range ch {
        fmt.Println(i);
    }
}

https://play.golang.org/p/euMTGTIs83g

Hope it helps.

Since my solution looks a little similar to already answered, I change it to my original answer before modification to suit OP question.

Here's the code:

package main

import (
    "fmt"
    "time"
    "sync"
)

// producer produce values tobe sent to consumer
func producer(ch chan int, d time.Duration, num int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:=0; i<num; i++ {
        ch <- i;
        time.Sleep(d);
    }
}

// consumer consume all values from producers
func consumer(ch chan int, out chan int, wg *sync.WaitGroup) {
    defer wg.Done();
    for i:= range ch {
        out <- i
    }
}

// synchronizer synchronize all goroutines to avoid deadlocks
func synchronizer(ch chan int, out chan int, wgp *sync.WaitGroup, wgc *sync.WaitGroup) {
    wgp.Wait()
    close(ch)
    wgc.Wait()
    close(out)
}

func main() {
    wgp  := &sync.WaitGroup{}
    wgc  := &sync.WaitGroup{}
    ch  := make(chan int);
    out := make(chan int);

    wgp.Add(2);
    go producer(ch, 100*time.Millisecond, 2, wgp);
    go producer(ch, 200*time.Millisecond, 5, wgp);

    wgc.Add(1);
    go consumer(ch, out, wgc)

    go synchronizer(ch, out, wgp, wgc)

    // print the outputs
    for i:= range out {
        fmt.Println(i);
    }
}

Using consumer goroutine to fan-in all input from multiple goroutines and read all values from the consumer goroutine.

Hope it helps.

mfathirirhas
  • 2,169
  • 5
  • 23
  • 35
  • Isn't this "exactly" the same as my solution (apart from you calling `wg.Add()` only once, and "misplacing" the `defer wg.Done()`)? – icza Feb 20 '18 at 11:57
  • I'm surprised that we have same solution. I ever read about the sync.waitgroup somewhere. Actually I've ever written a little different solution than this, but I change it to match the op question. I'll change to my own. – mfathirirhas Feb 21 '18 at 04:10
0

Simpler answer- one of the producers needs to close the channel, and the consumer can just range over the channel.

package main

import (
    "fmt"
    "time"
)

func producer(ch chan int, d time.Duration, num int, closer bool) {

    for i:=0; i<num; i++ {
        ch <- i
        time.Sleep(d)   
    }
    if closer {
        close(ch)
    }
}

func main() {
    ch := make(chan int)

    go producer(ch, 100*time.Millisecond, 2, false)
    go producer(ch, 200*time.Millisecond, 5, true)

    for i := range ch {
        fmt.Println(i)
    }

}

Of course, unless you have a situation where you know which producer will always finish last, you would not want to do this in real code. Better designs are in the WaitGroup-based patterns in the other answers. But this is the simplest way for this code to avoid deadlock.

Jonah Benton
  • 3,598
  • 1
  • 16
  • 27
  • 2
    What happens if the producer that isn't the "closer" is still running when the closer closes the channel? This is not a very stable design. – Adrian Feb 19 '18 at 14:50
  • Of course! I gave this answer because the asker is stuck on code and this is the simplest way to get unstuck. Design comes later. – Jonah Benton Feb 19 '18 at 14:55
  • They'll be almost immediately re-stuck, though. I'm all for "one problem at a time" but solving a problem by creating a different problem seems counter-productive, especially when `sync.WaitGroup` exists to handle exactly this situation. – Adrian Feb 19 '18 at 14:56
  • Yeah, maybe. Sometimes introducing a different design is too much to grasp. Anyway, I edited my answer to clarify the goal. – Jonah Benton Feb 19 '18 at 15:03