0

I'm trying to implement a redis queue in the current system. The job will be sent to another module and it should wait until the job is done and returned the result, job.result, then move on:

with Connection(redis_connection):
    job = job_queue.enqueue(worker_func, func_input1, func_input2)

print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
    pass
print(datetime.datetime.now())
print("got result")

# next step
next_step_func(job.result)

...

I'm facing 2 issues here:

  1. the busy wait, while job.result is None is taking a very long time. My processing in the worker_func is about 2-3 seconds, which involves calling APIs on another server, yet the busy wait while job.result is None itself takes another >= 3 seconds, adding to >= 5 seconds in total. I am positive that the waiting takes place after the execution of while job.result is None because I added logs for both worker_func and while job.result is None :
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009

as you can see above, the busy wait while loop happens after the worker_func is done.

2, is there any other elegant way to implement this synchronous wait here instead of a busy loop? I think the busy loop here is definitely not the best implementation as it will consume a lot of CPU resources.

Thanks!

-- editing my code above to give a clearer context

I need to value of next_step_func(job.result) to be returned from where job_queue.enqueue is called. so a clearer structure would be:

def endpoint():
    with Connection(redis_connection):
        job = job_queue.enqueue(worker_func, func_input1, func_input2)

    print("waiting for result")
    print(datetime.datetime.now())
    while job.result is None:
        pass
    print(datetime.datetime.now())
    print("got result")

    # next step
    return next_step_func(job.result)

...

so the painpoint is that I need the job.result able to be returned in endpoint(), yet the Job Callback will take my job to a different context at on_success.

Daniel Qiao
  • 121
  • 5
  • 15
  • General tip for busy loops - instead of `pass`, use `time.sleep()` for a few milliseconds to the loop in order to prevent excessive CPU hogging. – Tomalak Jul 12 '21 at 11:38

1 Answers1

2

Documentation suggests using the job callbacks as an option:

def job_succeeded(job, connection, result, *args, **kwargs):
    next_step_func(job.result)

def job_failed(job, connection, type, value, traceback):
    # react to the error
    pass

with Connection(redis_connection):
    args = (func_input1, func_input2)
    job_queue.enqueue(worker_func, args=args, on_success=job_succeeded, on_failure=job_failed)
Tomalak
  • 332,285
  • 67
  • 532
  • 628
  • Hi Tomalak, thanks so much for pointing me to the right resource there, but I think I was a bit unclear in my question. After waiting until the `job` is finished, I need to pass the `job` to `next_step_func(job.result)` and return the result of `next_step_func(job.result)` from where `next_step_func(job.result)` (and `job_queue.enqueue`) is called, as I need to return `job.result` as a response to a synchronous endpoint. is there a way I can access the result value of `on_success` from where `job_queue.enqueue` is called? – Daniel Qiao Jul 12 '21 at 12:23
  • @Daniel `on_success` has no result value, so there is nothing to access. Your current setup is this: 1) enqueue task 2) busy waiting 3) call `next_step_func(job.result)`. I think that's exactly what the code in my answer does? – Tomalak Jul 12 '21 at 12:38
  • Sorry if I was a bit unclear in my phrasing. I've editted my question description above with updated code. Basically the entire code is an endpoint function, and the `job.result` need to be returned in this endpoint function but the `on_success` function is taking it away from this endpoint and it seems I cannot return a value in `on_success` – Daniel Qiao Jul 12 '21 at 12:56
  • @Daniel That's a recurring question over in the [javascript] tag: *"How can I return a value from an asynchronous function?"* and the short answer is - you can't. That's why it's asynchronous. It returns before it is finished (this is the effect you are seeing). The answer is always a variation of "You can define a callback function that is called when the asynchronous workload is done, and anything you want to happen then, such as using the returned value, must be done in that callback function." – Tomalak Jul 12 '21 at 17:15
  • also see https://stackoverflow.com/questions/44630676/how-i-call-an-async-function-without-await – Tomalak Jul 12 '21 at 17:29