I have a code for consumer and producer in go. Although I have asked this question for code-review here and a good part of the idea was derived from this thread here here is the code in playground.
- This code has multiple producers and consumers sharing the same channel.
- This code has an error handling mechanism, if any of the workers (producer or consumer) errors out than all the workers should by halted.
I am concerned about deadlock scenario where all consumers are shut but producer is still adding data to shared channel. To "mitigate" this I have added a context check right before adding data into the data queue - specifically Line 85 in go playground.
However is a dead lock still possible if - the producer checks for context.Done() in Line 85, then the context is cancelled causing all consumers to shut down, and then the producer tries to insert data into the queue ?
If so how to mitigate.
Reposting the code:
package main
import (
"context"
"fmt"
"sync"
)
func main() {
a1 := []int{1, 2, 3, 4, 5}
a2 := []int{5, 4, 3, 1, 1}
a3 := []int{6, 7, 8, 9}
a4 := []int{1, 2, 3, 4, 5}
a5 := []int{5, 4, 3, 1, 1}
a6 := []int{6, 7, 18, 9}
arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}
ctx, cancel := context.WithCancel(context.Background())
ch1 := read(ctx, arrayOfArray)
messageCh := make(chan int)
errCh := make(chan error)
producerWg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
producerWg.Add(1)
producer(ctx, producerWg, ch1, messageCh, errCh)
}
consumerWg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
consumerWg.Add(1)
consumer(ctx, consumerWg, messageCh, errCh)
}
firstError := handleAllErrors(ctx, cancel, errCh)
producerWg.Wait()
close(messageCh)
consumerWg.Wait()
close(errCh)
fmt.Println(<-firstError)
}
func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
ch := make(chan []int)
go func() {
defer close(ch)
for i := 0; i < len(arrayOfArray); i++ {
select {
case <-ctx.Done():
return
case ch <- arrayOfArray[i]:
}
}
}()
return ch
}
func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case arr, ok := <-in:
if !ok {
return
}
for i := 0; i < len(arr); i++ {
// simulating an error.
//if arr[i] == 10 {
// errCh <- fmt.Errorf("producer interrupted")
//}
select {
case <-ctx.Done():
return
case messageCh <- 2 * arr[i]:
}
}
}
}
}()
}
func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
go func() {
wg.Done()
for {
select {
case <-ctx.Done():
return
case n, ok := <-messageCh:
if !ok {
return
}
fmt.Println("consumed: ", n)
// simulating erros
//if n == 10 {
// errCh <- fmt.Errorf("output error during write")
//}
}
}
}()
}
func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
firstErrCh := make(chan error, 1)
isFirstError := true
go func() {
defer close(firstErrCh)
for err := range errCh {
select {
case <-ctx.Done():
default:
cancel()
}
if isFirstError {
firstErrCh <- err
isFirstError = !isFirstError
}
}
}()
return firstErrCh
}