0

I have a concept here which I don't know how I should solve correctly with minimum impact on the system in Go.

I'm making a 'print-spooler' where clients can call in on an API (/StartJob) to process printjobs.

Since there is only one printer so the bottleneck is a single worker that processes each job at a time, but clients can pass one job at any given time, it will just queue up and the worker will process each job in the time it takes step by step.

The way I'm doing it is that ServeHTTP pushes the job onto the channel (note here I just pass on the ID it the worker will look up the printdata from that):

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    
    switch r.URL.Path {
    case "/StartJob":
        newPrintJob := QueueElement {jobid: "jobid"}
        gv.printQueue <- newPrintJob
        fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

    default:
        fmt.Fprintf(w, "No such Api")
    }
  }

The Worker then just runs all the time and processes any jobs coming in. Real code there is a bit more but in the end it executes an external process:

  func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        processExec ("START /i /b processAndPrint.exe -"+job.jobid)
      }

The thing here is the external process can take time to execute, sometimes its instant but under some circumstances it can take 1 minute to perform the task before it returns.

My problem here is that now in serverHTTP, I write back to the client instantly with no knowledge if the job was the first in line and instantly can be processed, or if it has been queued up and maybe will be seconds away or minutes away before its processed:

  fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

What I would like is to give the client up to 5 seconds to get a reply back if his job was processed within that time, or if not, tell him he needs to call back later to check the status of his job.

I had several approaches in mind:

  1. In my QueueElemnt I pass on the http.ResponseWriter so I'm able to write to the responsewriter (reply back to client) from the Worker. This I can do if I let the serveHTTP sleep as the ResponseWriter will shut down when the go routine exists. So here I would need to wait in serveHTTP and then when its waiting the worker is allowed to write to the ResponseWriter.

    The problem with this approach is that if the job is minutes away, the Worker won't write anything to that ResponseWriter, and the serveHTTP wouldn't know if a reply had been sent from the worker.

  2. I could create a channel for each QueueElement so the serveHTTP and not only the worker but the actual job if being processed by the worker are able to communicate to each other.

    This approach I haven't tested, but I also worry here that its overkill and heavy on the system as we can have a situation where we have many many many api requests coming in and thereby a big queue that's being processed, so even though I would need to timeout/cancel it after 5 seconds, I think the concept is overkill?

  3. I could maybe pass on a mutexed value in the queueElement that both serveHTTP could check for up to 5 seconds and the queue could check/manipulate but in case a job is finished then the queueElement disappears so this maybe results in conflicts.

  4. I could do a variation of no 1) where I write my own responsewriter and use the flag if something has been written to it already, so the serveHTTP would check for up to 5 seconds on that to check if the Worker already wrote a reply back to the client and in that case exit serveHTTP with no answer, or in case no writes then serveHTTP would write the message back to the client, a bit along the lines of this.

But none of these I feel are very smooth and I don't want to launch forever amount of go-routines or channels or lock myself into mutuxes everywhere as I don't know the impact of what it has on the system.

Can anyone assist me in a working correct way to implement such a thing? I've been reading page up page down and haven't found a nice and clean way of achieving this.

Community
  • 1
  • 1
MdTp
  • 357
  • 2
  • 11
  • `gv.printQueue` is it a buffered channel ? – Ankit Deshpande May 31 '19 at 07:39
  • created it as unbuffered but am not exactly sure if thats right to do, i did that from the perspective that i didnt want to have an upper limit of elements that could be put into the channel/queue - but i can read here that they act differently - what would you recommend ? ( i dont want to block for incoming api' requests and basically i would like to be able to handle any number of incoming and push these to the channel/queue ) – MdTp May 31 '19 at 09:03

2 Answers2

2

I think the easiest approach is the first one slightly modified. You can pass the http.ResponseWriter to the worker, which spanws another worker that actually carries out the job, while the "parent" worker waits for its completition or a timeout. It will reply to the HTTP client as soon as one of the two events occurs first.

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        newPrintJob := QueueElement {writer: w, jobid: "jobid"}
        gv.printQueue <- newPrintJob
        fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

    default:
        fmt.Fprintf(w, "No such Api")
    }
  }

func worker(jobs <-chan QueueElement) {
    for {
        done := make(chan struct{})
        job := <-jobs

        go func(d chan struct{}, q QueueElement) {
            processExec ("START /i /b processAndPrint.exe -"+q.jobid)
            d <- struct{}{}
        }(done, job)

        select {
            //Timeout
            case <-time.After(time.Second * 5):
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            //Job processing finished in time
            case <-done:
                fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
        }
   }

You can spawn the "waiting" goroutine as soon as you receive the HTTP request. In this way the timeout timer will take into account the entire processing of the request/job.

Example:

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
        newPrintJob := QueueElement{
            doneChan: doneC,
            jobid:    "jobid",
        }
        go func(doneChan chan struct{}) {
            ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
            defer cancel()
            select {
            //If this triggers first, then this waiting goroutine would exit
            //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
            case <-ctx.Done():
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            case <-doneChan:
                fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
            }
        }(doneC)
        gv.printQueue <- newPrintJob

    default:
        fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        processExec("START /i /b processAndPrint.exe -" + job.jobid)
        job.doneChan <- struct{}{}

    }
}
Giulio Micheloni
  • 1,290
  • 11
  • 25
  • 1
    this was a smart abstraction - to handle long jobs - when - the job is taken out from the queue, the problem with this though would be that in case the job is way back in the queue/channel ( worker isnt close to be processing this jobid yet ) serveHTTP would still be left in the unknown ? – MdTp May 31 '19 at 08:48
  • The solution would be to spawn the goroutine closer to where you get the HTTP request. I edited my answer with a new example for this. You would still need to spawn an additional goroutine for each HTTP request. Following the same pattern I am suggesting, you can spawn only one "waiter" goroutine at the beginning and carry the job ID in the channel message. – Giulio Micheloni May 31 '19 at 09:09
  • 1
    thats a pretty solution and makes sense thanks alot - but how is it abou go routines would this be ok lets say if we had 1000 api requests pumping in and thereby creating double as many goroutines as it would normally create with just serveHTTP ? is it heavy on the system or are those numbers not even close to be heavy ? – MdTp May 31 '19 at 09:16
  • You can definitely implement a version of the above example where the "waiter" goroutine is just one. It would need to keep track of ongoing HTTP requests, set timer for those and wait for their "done" channel. All of this inside a single `select{}` block. There is memory complexity involved there. But, as they say, _Premature optimization is the root of all evil_. Moreover, Go production code can easily spawn hundreds of thousands goroutines. More details [here](https://stackoverflow.com/questions/8509152/max-number-of-goroutines). – Giulio Micheloni May 31 '19 at 09:25
  • 1
    yes youre right that was a great approach having one that handles all waiters ! awesome thanks so much Giulio also for the added explanation here with go routines ! – MdTp May 31 '19 at 09:29
  • in your example - the : ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - where should ctx be created ? am not 100% sure how this works, do i need to create the context in the channel struct or ? – MdTp May 31 '19 at 15:05
  • Can you clarify? It is not common to store the context.Context in data structure. It would be better to explicitly pass it to the function call or goroutine. – Giulio Micheloni May 31 '19 at 15:14
  • its just im trying to implement the code here and not sure where i need to declare the dtx ( if i write the code the same way in your example, but ctx := context.WaitTimeout(ctx,...) complains it doesnt know ctx at that point. ( first time im trying context ) – MdTp May 31 '19 at 15:39
  • hm i cant get it working with this approach - launching the go routine inside the serveHTTP do get the reply from the worker, but problem is that serveHTTP exits right away ( then the connection to the ResponseWriter is lost ). If i modify it so its not a go function but just a function - then the concept works - but problem with this is that serveHTTP gets blocked so if i launch ex 10 api request against it they will be handled sequentially which gives longer and longer response time the more api requests is sent :( – MdTp May 31 '19 at 21:16
0

I would avoid holding the request for a long time because we are not sure when the job will be processed.

One approach I can think of is:

Initially reply from the server as accepted / queued and return job_id.

{
   "job_id": "1",
   "status": "queued"
}

The client can poll(say every 5 seconds) or use long polling to check the status of the job.

When is it running:

{
   "job_id": "1",
   "status": "processing"
}

After completion:

{
   "job_id": "1",
   "status": "success/failed"
}

Ankit Deshpande
  • 3,476
  • 1
  • 29
  • 42
  • yes this would be my final resort ending up doing an instant reply and demand polling, but i would like somehow to avoid it as in some cases there will be no load on the channel and the worker will process the job right away. In my head i compare it abit like if you call an api that will request a larger lot of data from a database here you also sometimes will have a processing time on the reply from the api and not require to poll it. – MdTp May 31 '19 at 08:51
  • You can then pass a result channel in the job struct, when the worker is finished, it can write results to the result channel – Ankit Deshpande May 31 '19 at 09:00
  • is it good practice to start up that many channels ? (even though they max live 5 seconds in case of the job cant be processed yet ) - also in case the worker havent even got to the job yet, we will exit serveHTTP after 5 seconds ( timeout on a reply on a channel ?) and then the worker will write onto a dead channel where nobody is listening anymore ? – MdTp May 31 '19 at 09:07
  • Write a method to check status of the job. After 5 seconds or right before exiting serveHTTP check status and return it. If status is not finished, later hit the api with job_id and check the status. This will result in avoiding result channel. – Ankit Deshpande May 31 '19 at 09:27
  • if it goes longer than 5 seconds then i inform the client they have to poll from there, and i was thinking of doing an additional storage of status of each job and just do a check but then i think i would have to put a mutex around the check value and create 2 storage areas ( the channel just for work elements, and ex a map or a database if i want to store it as a jobstatus database ). just dont like to do a loop with a sleep in it to check every 250 ms for a status value :/ – MdTp May 31 '19 at 09:32
  • and i would like to let the client loose as soon as possible, i dont want them to be forced to wait 5 seconds everytime, if the job has been processed right away i want to inform them right away. – MdTp May 31 '19 at 09:33