0

I have a worker pool small program and I want to be able to stop/end the execution of a single job and return a result for the job, if a "cancel" request has been received for that job.

Currently I have this, but if I send a "cancel" message and attempt to return on the "results" channel, nothing happens and my job continues until it is finished. How can I achieve the functionality I am looking for (stoping a job without closing the channel)?

var opChan = make(chan OperationReq, numJobs)
var opResChan = make(chan string, numJobs)

func main() {
    operations := []string{
        "1", "2", "3", "4", "5", "6", "7", "8",
    }

    fmt.Println("Starting workers")
    for i := 0; i < 2; i++ {
        go worker(opChan, opResChan)
    }

    for _, o := range operations {
        cancelChan := make(chan bool)
        opChan <- OperationReq{
            OperationID: o,
            CancelChan:  cancelChan,
        }
        time.Sleep(1 * time.Second)
        fmt.Println("will send cancel req")
        cancelChan <- true
    }

    for a := 1; a <= len(operations); a++ {
        res := <-opResChan
        fmt.Println(res)
    }
}

type OperationReq struct {
    OperationID string
    CancelChan  chan bool
}

func checkCancelled(o *OperationReq) bool {
    select {
    case cancel := <-o.CancelChan:
        fmt.Println("received cancel request: ", cancel, " for operation: ", o.OperationID)
        return true
    default:
        return false
    }
}
func worker(op <-chan OperationReq, results chan<- string) {
    for o := range op {
        fmt.Println("Starting operation: ", o.OperationID)
        if checkCancelled(&o) {
            results <- "Canceled operation: " + o.OperationID
            return
        }
        time.Sleep(time.Second * time.Duration(10)) //time.Duration(rand.Intn(15)))
        if checkCancelled(&o) {
            results <- "Canceled operation: " + o.OperationID
            return
        }
        fmt.Println("Finished operation: ", o.OperationID)

        results <- "Done operation: " + o.OperationID
    }
}

anho
  • 1,705
  • 2
  • 20
  • 38
  • You haven't told us anything about this operation... how would you stop your operation half-way through? Do that. – Jonathan Hall Aug 09 '21 at 11:25
  • You have a 10-second sleep. That will not get interrupted. After that you unconditionally send a value on `results`. You have to "continuously" monitor the cancel channel, and abort the job yourself. See the marked duplicate for details. – icza Aug 09 '21 at 11:27
  • @icza looking at the duplicate, would I then need to do a select just after the `time.Sleep`? Of course I can't stop the sleep, but I want to exit before the result is returned after the sleep if the operation is cancelled. – anho Aug 09 '21 at 11:34
  • Yes, you'd need a (non-blocking) `select`, and if the cancel channel is closed, return early. – icza Aug 09 '21 at 11:42
  • hmm, I am trying that but seems like I am running into a deadlock as the return on the result channel doesn't happen. @icza I've updated the code snippet in the question, would you mind giving it a look please? – anho Aug 09 '21 at 11:48
  • That can easily happen: you create cancel channels and you send on them unconditionally. If there are no active worker that would receive from it, then sending on it will block. Cancels are often signaled by closing the channel: closing it does not block, and all "listeners" will get notified. Sending a value on a channel can only signal 1 receiver. – icza Aug 09 '21 at 11:52
  • @icza I have tried different things, but I am not sure I fully understand where or how I should be closing my channel. Can you please, even in pseudo code, provide me with an example? – anho Aug 09 '21 at 12:43
  • You close the channel where your app decides the job is no longer needed. – icza Aug 09 '21 at 12:55
  • My issue is in that case that closing the channel will also stop the other jobs in the queue. Currently I want to use this design in making an API that can start and stop jobs. These jobs will go on my `opChan` and if I understand it right, then if I want to stop a job and have to close the channel then I both can't take any more start requests but also will not process the jobs left in the queue. Right @icza? If that's the case then, how can I accomplish what I am looking for by closing the channel? – anho Aug 09 '21 at 13:06
  • If you only need to stop a specific job and not all, then use separate channels, and only close the channel which you use for that specific job. – icza Aug 09 '21 at 13:24
  • @icza that makes sense, thank you, just a last question... How will you implement QOS on a setup where you will create the channel when the request gets created? You can't use the channel buffer for that anymore, will then need to manage the whole state somehow, right? – anho Aug 09 '21 at 13:34
  • @anho This question sounds quite blury anough and specific without details to give a right answer. Maybe come up with a [mcve] and post it as a new question. – icza Aug 09 '21 at 13:37

0 Answers0