1

I have a channel which stores received data, I want to process it when one of following conditions is met:
1, the channel reaches its capacity.
2, the timer is fired since last process.

I saw the post Golang - How to know a buffered channel is full

Update:

I inspired from that post and OneOfOne's advice, here is the play :

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var c chan int
var timer *time.Timer

const (
    capacity     = 5
    timerDration = 3
)

func main() {
    c = make(chan int, capacity)
    timer = time.NewTimer(time.Second * timerDration)
    go checkTimer()
    go sendRecords("A")
    go sendRecords("B")
    go sendRecords("C")

    time.Sleep(time.Second * 20)
}

func sendRecords(name string) {
    for i := 0; i < 20; i++ {
        fmt.Println(name+" sending record....", i)
        sendOneRecord(i)
        interval := time.Duration(rand.Intn(500))
        time.Sleep(time.Millisecond * interval)
    }
}

func sendOneRecord(record int) {
    select {
    case c <- record:
    default:
        fmt.Println("channel is full !!!")
        process()
        c <- record
        timer.Reset(time.Second * timerDration)
    }
}

func checkTimer() {
    for {
        select {
        case <-timer.C:
            fmt.Println("3s timer ----------")
            process()
            timer.Reset(time.Second * timerDration)
        }
    }
}

func process() {
    for i := 0; i < capacity; i++ {
        fmt.Println("process......", <-c)
    }
}

This seems to work fine, but I have a concern, I want to block the channel writing from other goroutine when process() is called, is the code above capable to do so? Or should I add a mutex at the beginning of the process method?

Any elegant solution?

Community
  • 1
  • 1
seaguest
  • 2,510
  • 5
  • 27
  • 45
  • If this is a performance optimization, I would just `for task := range c {...}` in the processor(s). The scheduler will make your senders block when the channel buffer fills, allowing time for the processor(s) to run. It tends to work out okay. If there's another reason you want to control when what runs, you might get better answers by explaining it in a new question. – twotwotwo Jul 17 '16 at 05:27

2 Answers2

1

As was mentioned by @OneOfOne, select is really the only way to check if a channel is full.

If you are using the channel to effect batch processing, you could always create an unbuffered channel and have a goroutine pull items and append to a slice.

When the slice reaches a specific size, process the items.

Here's an example on play

package main

import (
    "fmt"
    "sync"
    "time"
)

const BATCH_SIZE = 10

func batchProcessor(ch <-chan int) {
    batch := make([]int, 0, BATCH_SIZE)
    for i := range ch {
        batch = append(batch, i)
        if len(batch) == BATCH_SIZE {
            fmt.Println("Process batch:", batch)
            time.Sleep(time.Second)
            batch = batch[:0] // trim back to zero size
        }
    }
    fmt.Println("Process last batch:", batch)
}
func main() {
    var wg sync.WaitGroup
    ch := make(chan int)
    wg.Add(1)
    go func() {
        batchProcessor(ch)
        wg.Done()
    }()
    fmt.Println("Submitting tasks")
    for i := 0; i < 55; i++ {
        ch <- i
    }
    close(ch)
    wg.Wait()
}
David Budworth
  • 11,248
  • 1
  • 36
  • 45
0

No, select is the only way to do it:

func (t *T) Send(v *Val) {
    select {
    case t.ch <- v:
    default:
        // handle v directly
    }
}
OneOfOne
  • 95,033
  • 20
  • 184
  • 185