2

This is doing my head in, I cant figure out how to solve it;

  • I want to have a fixed number N of goroutines running in parallell
  • From a never-ending queue I will fetch X msg about jobs to process
  • I want to let the N goroutines process these X jobs, and as soon as one of the routines have nothing more to do, I want to fetch another X jobs from the neverending queue

The code in the answer below (see url) works brilliantly to process the tasks, but the workers will die once that tasks list is empty, I want them to stay alive and somehow notify the main code that they are out of work so I can fetch more jobs to fill the tasks list with tasks

How would you define a pool of goroutines to be executed at once in Golang?

Using user:Jsor example code from below, I try to create a simple program, but I am confused.

import (
    "fmt"
    "strconv"
)

//workChan - read only that delivers work
//requestChan - ??? what is this
func Worker(myid string, workChan <- chan string, requestChan chan<- struct{}) {
    for {
        select {
        case work := <-workChan:
            fmt.Println("Channel: " + myid + " do some work: " + work)
        case requestChan <- struct{}{}:
            //hm? how is the requestChan used?
        }
    }
}

func Logic(){

    workChan := make(chan string)
    requestChan := make(chan struct{})

    //Create the workers
    for i:=1; i < 5; i++ {
        Worker( strconv.Itoa( i), workChan, requestChan)
    }

    //Give the workers some work
    for i:=100; i < 115; i++ {
        workChan<- "workid"+strconv.Itoa( i)
    }

}
Community
  • 1
  • 1
Susan D. Taylor
  • 669
  • 2
  • 7
  • 17

2 Answers2

2

This is what the select statement is for.

func Worker(workChan chan<- Work, requestChan chan<- struct{}) {
    for {
        select {
        case work := <-workChan:
            // Do work
        case requestChan <- struct{}{}:
        }
    }
}

This worker will run forever and ever. If work is available, it will pull it from the worker channel. If there's nothing left it will send a request.

Not that since it runs forever and ever, if you want to be able to kill a worker you need to do something else. One possibility is to always check ok with workChan and if that channel is closed quit the function. Another option is to use an individual quit channel for each worker.

Linear
  • 21,074
  • 4
  • 59
  • 70
  • Also note that a `case`statement inside a `select` becomes non-blocking, if the corresponding channel is closed. – tike Mar 24 '14 at 11:12
  • thanks! I am still puzzled how it works; What is the requestChan and how is that one used? (see my sample program above) – Susan D. Taylor Mar 25 '14 at 10:30
0

Compared to the other solution you posted, you just need (first) not to close the channel, and just keep feeding items to it.

Then you need to answer the following question: is it absolutely necessary that (a) you fetch the next X items from your queue only once one of the workers has “nothing more to do” (or, what is the same, once the first X items are either fully processed, or assigned to a worker); or (b) is it okay if you keep the second set of X items in memory, and go feeding them to the workers as new work items are needed?

As I understand it, only (a) needs the requestChan you’re wondering about (see below). For (b), something as simple as the following would suffice:

# B version

type WorkItem int

const (
  N = 5  // Number of workers
  X = 15 // Number of work items to get from the infinite queue at once
)

func Worker(id int, workChan <-chan WorkItem) {
  for {
    item := <-workChan
    doWork(item)
    fmt.Printf("Worker %d processes item #%v\n", id, item)
  }
}

func Dispatch(workChan chan<- WorkItem) {
  for {
    items := GetNextFromQueue(X)

    for _, item := range items {
      workChan <- item
      fmt.Printf("Dispatched item #%v\n", item)
    }
  }
}

func main() {
  workChan := make(chan WorkItem) // Shared amongst all workers; could make it buffered if GetNextFromQueue() is slow.

  // Start N workers.
  for i := 0; i < N; i++ {
    go Worker(i, workChan)
  }

  // Dispatch items to the workers.
  go Dispatch(workChan)

  time.Sleep(20 * time.Second) // Ensure main(), and our program, finish.
}

(I’ve uploaded to the Playground a full working solution for (b).)

As for (a), the workers change to say: do work, or if there’s no more work, tell the dispatcher to get more via the reqChan communication channel. That “or” is implemented via select. Then, the dispatcher waits on reqChan before making another call to GetNextFromQueue(). It’s more code, but ensures the semantics that you might be interested in. (The previous version is overall simpler, though.)

# A version

func Worker(id int, workChan <-chan WorkItem, reqChan chan<- int) {
  for {
    select {
    case item := <-workChan:
      doWork(item)
      fmt.Printf("Worker %d processes item #%v\n", id, item)
    case reqChan <- id:
      fmt.Printf("Worker %d thinks they requested more work\n", id)
    }
  }
}

func Dispatch(workChan chan<- WorkItem, reqChan <-chan int) {
  for {
    items := GetNextFromQueue(X)

    for _, item := range items {
      workChan <- item
      fmt.Printf("Dispatched item #%v\n", item)
    }

    id := <-reqChan
    fmt.Printf("Polling the queue in Dispatch() at the request of worker %d\n", id)
  }
}

(I’ve also uploaded to the Playground a full working solution for (a).)

Community
  • 1
  • 1
Dato
  • 354
  • 3
  • 12
  • On second thoughts, the second version might be as well be scratched altogether: it adds next to nothing, because select is not guaranteed to succeed in top-bottom order: “If multiple cases can proceed, a uniform pseudo-random choice is made to decide which single communication will execute.” ([link](http://golang.org/ref/spec#Select_statements)) So as soon as the dispatcher is waiting on `reqChan`, one of the workers may write to it. — Is there something of the simpler version that is not enough for what you want? – Dato Mar 26 '14 at 03:44