3

I'm reading the pipelines tutorial online and trying to construct a stage that operates like this --

  1. Batches up incoming events in batches of 10 each before sending them to the out chan
  2. If we haven't seen 10 events in 5 seconds, combine as many as we received and send them, closing the out chan and returning.

However, I have no idea what would the first select case would look like.Tried multiple things but couldn't get past this. Any pointers much appreciated!

func BatchEvents(inChan <- chan *Event) <- chan *Event {
    batchSize := 10
    comboEvent := Event{}
    go func() {
        defer close(out)
        i = 0
        for event := range inChan {
            select {
            case -WHAT GOES HERE?-:
                if i < batchSize {
                    comboEvent.data = append(comboEvent.data, event.data)
                    i++;
                } else {
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event{}
                    i=0;
                }
            case <-time.After(5 * time.Second):
                // process whatever we have seen so far if the batch size isn't filled in 5 secs
                out <- &comboEvent
                // stop after
                return
            }
        }
    }()
    return out
}
ultimoo
  • 495
  • 2
  • 5
  • 13

1 Answers1

3

Instead of doing a range over the channel, your first select case should be from that channel, with the whole thing inside an infinite loop.

func BatchEvents(inChan <-chan *Event) <-chan *Event {
    batchSize := 10
    comboEvent := Event{}
    go func() {
        defer close(out)
        i = 0
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    return
                }
                comboEvent.data = append(comboEvent.data, event.data)
                i++
                if i == batchSize {
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event{}
                    i = 0
                }
            case <-time.After(5 * time.Second):
                // process whatever we have seen so far if the batch size isn't filled in 5 secs
                if i > 0 {
                    out <- &comboEvent
                }
                // stop after
                return
            }
        }
    }()
    return out
}
Andy Schweig
  • 6,597
  • 2
  • 16
  • 22
  • 1
    Just note that `select`s are __NOT__ ordered by priority. You have to artificially produce a prioritized set of `select`s. See https://stackoverflow.com/questions/11117382/priority-in-go-select-statement-workaround – RayfenWindspear Aug 25 '17 at 00:49