1

say I have a case of reader, manipulator, consumer in different routines:

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "github.com/pkg/errors"
)

func Reader(ctx context.Context, chanFromReader chan int) error {
    defer close(chanFromReader)
    for i := 0; i < 100; i++ {
        select {
        case <-ctx.Done():
            return nil
        case chanFromReader <- i:
        }
    }
    return nil
}

func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
    defer close(chanToWriter)
    for {
        select {
        case <-ctx.Done():
            return nil
        case x, ok := <-chanFromReader:
            if !ok {
                return nil
            }
            chanToWriter <- 2 * x
        }
    }
}

func Writer(ctx context.Context, chanToWriter chan int) error {
    for {
        select {
        case <-ctx.Done():
            return nil
        case x, ok := <-chanToWriter:
            if !ok {
                return nil
            }
            fmt.Println("Writer: ", x)
            if x == 10 {
                return errors.New("Generate some error in writer")
            }
        }
    }
}

func main() {
    g, ctx := errgroup.WithContext(context.Background())
    chanFromReader := make(chan int)
    chanToWriter := make(chan int)

    func(ctx context.Context, chanToWriter chan int) {
        g.Go(func() error {
            return Writer(ctx, chanToWriter)
        })
    }(ctx, chanToWriter)

    func(ctx context.Context, chanFromReader chan int, chanToWriter chan int) {
        g.Go(func() error {
            return Manipulate(ctx, chanFromReader, chanToWriter)
        })
    }(ctx, chanFromReader, chanToWriter)

    func(ctx context.Context, chanFromReader chan int) {
        g.Go(func() error {
            return Reader(ctx, chanFromReader)
        })
    }(ctx, chanFromReader)

    g.Wait()
    fmt.Println("Main wait done")
}

https://play.golang.org/p/whslVE3rzel

In case the writer fails for some reason, I'm having trouble aborting the rest of the routines. In the example above for instance, though they listen on ctx for cancellation they still deadlock on case of fail in writer, is there a workaround this?

I thought of adding this:

func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
    defer close(chanToWriter)
    for {
        select {
        case <-ctx.Done():
            return nil
        case x, ok := <-chanFromReader:
            if !ok {
                return nil
            }
            select {
            case <-ctx.Done():
                return nil
            case chanToWriter <- 2 * x:
            }
        }
    }
}

which solves it, but it looks so unclean...

O. San
  • 1,107
  • 3
  • 11
  • 25
  • why does it look so unclean ? I added this your code https://stackoverflow.com/a/27398062/4466350 => https://play.golang.org/p/7eGp9navmxi –  Aug 13 '21 at 17:26
  • but i would consider not aborting the stream. Just block for the destination to become available again. –  Aug 13 '21 at 17:28
  • I wouldn't call your proposed solution unclean as the inner channel operation starts another communication, so you'll have to select over it in order to break the wait. Once select chooses a statement, it proceeds to execute the respective case statement. In this example, since the case statement is also performing a communication `chanToWriter <- 2 * x` and there is no logic to look for cancellation signal (plus, there are no routine listening on this blocked channel), its resulting in a deadlock. – Kishore Bandi Aug 13 '21 at 19:37

1 Answers1

1

I would propose a solution where each channel gets closed only by the code that creates it. This can be enforced by returning a receive-only channel from the function that creates the channel and is responsible for closing it:

(kudos to mh-cbon for further refining this:)

https://play.golang.org/p/Tq4OVW5sSP4

package main

import (
    "context"
    "fmt"
    "log"
    "sync"
)

func read(ctx context.Context) (<-chan int, <-chan error) {
    ch := make(chan int)
    e := make(chan error)

    go func() {
        defer close(e)
        defer close(ch)

        for i := 0; i < 12; i++ {
            select {
            case <-ctx.Done():
                return
            case ch <- i:
            }
        }
    }()
    return ch, e
}

func manipulate(in <-chan int) (<-chan int, <-chan error) {
    ch := make(chan int)
    e := make(chan error)

    go func() {
        defer close(e)
        defer close(ch)

        for n := range in {
            ch <- 2 * n
        }
    }()
    return ch, e
}

func write(in <-chan int) <-chan error {
    e := make(chan error)
    go func() {
        defer close(e)
        for n := range in {
            fmt.Println("written: ", n)
            if n == 10 {
                e <- fmt.Errorf("output error during write")
            }
        }
    }()
    return e
}

func collectErrors(errs ...<-chan error) {
    var wg sync.WaitGroup
    for i := 0; i < len(errs); i++ {
        wg.Add(1)
        go func(errs <-chan error) {
            defer wg.Done()
            for err := range errs {
                log.Printf("%v", err)
            }
        }(errs[i])
    }
    wg.Wait()
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    ch1, err1 := read(ctx)
    ch2, err2 := manipulate(ch1)
    err3 := write(ch2)

    collectErrors(err1, err2, err3)
    fmt.Println("main wait complete")
}

This way, each channel gets closed reliably, and the I/O errors from write will cause the child context to be cancelled, shutting down the other goroutines.

  • `manipulate` can still block at `ch <- 2 * n` thus, even if you reach to `cancel()` because `err != nil`, there is still a leaking routine when the program reaches at `fmt.Println("main wait complete")`. It might also happen that the exit signal happens when the program is about to hit `case <-ctx.Done():` in `manipulate`, in such case, the routine exits, iindeed. –  Aug 14 '21 at 16:41
  • always `defer cancel()` after `ctx, cancel := context.WithCancel(context.Background())` –  Aug 14 '21 at 16:50
  • 1
    carefully check for channel readyness when selecting over them, otherwise, suffer consequences of reading the default value in `case n := <-in:`, consider `case n, ok := <-in:`. see https://play.golang.org/p/9h0CN3AAHJR –  Aug 14 '21 at 16:54
  • @mh-cbon thank you for that feedback - I believe I've corrected the answer, to address those comments, as well as better demonstrate that the solution does not deadlock, and does not leak resources. – Neal McConachie Aug 14 '21 at 17:17
  • those last loops are not great. Consider using sync.WaitGroup –  Aug 14 '21 at 17:19
  • and more generally speaking, you could do context cancellation on the source only. The signaling would close in cascade gracefully. See https://play.golang.org/p/t2ofS8Qg-DF –  Aug 14 '21 at 17:23
  • @mh-cbon thanks again - I've updated the answer with what you've written there. (I just added stronger indications that the error channels are receive-only). – Neal McConachie Aug 14 '21 at 17:36
  • 1
    to summarize, dont abruptly exit from the streams middle/end points, this is literally obstructing them, instead, prefer gracefully shutting down them by applying cascaded channel close. –  Aug 14 '21 at 17:38
  • >.< Unfortunately this does not compile https://play.golang.org/p/UCcS32r5CMZ / https://play.golang.org/p/VZkZtLmIuwP –  Aug 14 '21 at 23:12