8

How can I set a blocking function to be run in a executor, in a way that the result doesn't matter, so the main thread shouldn't wait or be slowed by it.

To be honest I'm not sure if this is even the right solution for it, all I want is to have some type of processing queue separated from the main process so that it doesn't block the server application from returning requests, as this type of web server runs one worker for many requests.

Preferably I would like to keep away from solutions like Celery, but if that's the most optimal I would be willing to learn it.

The context here is a async web server that generates pdf files with large images.

app = Sanic()
#App "global" worker
executor = ProcessPoolExecutor(max_workers=5)

app.route('/')
async def getPdf(request):
  asyncio.create_task(renderPdfsInExecutor(request.json))
  #This should be returned "instantly" regardless of pdf generation time
  return response.text('Pdf being generated, it will be sent to your email when done')

async def renderPdfsInExecutor(json):
  asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)

def syncRenderPdfs(json)
  #Some PDF Library that downloads images synchronously
  pdfs = somePdfLibrary.generatePdfsFromJson(json)
  sendToDefaultMail(pdfs)

The above code gives the error (Yes, it is running as admin) :

PermissionError [WinError 5] Access denied
Future exception was never retrieved

Bonus question: Do I gain anything by running a asyncio loop inside the executor? So that if it is handling several PDF requests at once it will distribute the processing between them. If yes, how do I do it?

Mojimi
  • 2,561
  • 9
  • 52
  • 116
  • If I await the `run_in_executor` it won't block the main loop but i will block the function/response from being returned "instantly" – Mojimi Feb 18 '19 at 14:34
  • @freakish it doesn't block the loop, but blocks the `getPdf` function since it is awaiting, at least when I tested it directly that's what happened – Mojimi Feb 18 '19 at 14:36

2 Answers2

8

Ok, so first of all there is a misunderstanding. This

async def getPdf(request):
    asyncio.create_task(renderPdfsInExecutor(request.json))
    ...

async def renderPdfsInExecutor(json):
    asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, json)

is redundant. It is enough to do

async def getPdf(request):
    asyncio.get_running_loop.run_in_executor(executor, syncRenderPdfs, request.json)
    ...

or (since you don't want to await) even better

async def getPdf(request):
    executor.submit(syncRenderPdfs, request.json)
    ...

Now the problem you get is because syncRenderPdfs throws PermissionError. It is not handled so Python warns you "Hey, some background code threw an error. But the code is not owned by anyone so what the heck?". That's why you get Future exception was never retrieved. You have a problem with the pdf library itself, not with asyncio. Once you fix that inner problem it is also a good idea to be safe:

def syncRenderPdfs(json)
    try:
        #Some PDF Library that downloads images synchronously
        pdfs = somePdfLibrary.generatePdfsFromJson(json)
        sendToDefaultMail(pdfs)
    except Exception:
        logger.exception('Something went wrong')  # or whatever

Your "permission denied" issue is a whole different thing and you should debug it and/or post a separate question for that.

As for the final question: yes, executor will queue and evenly distribute tasks between workers.

EDIT: As we've talked in comments the actual problem might be with the Windows environment you work on. Or more precisely with the ProcessPoolExecutor, i.e. spawning processes may change permissions. I advice using ThreadPoolExecutor, assuming it works fine on the platform.

freakish
  • 54,167
  • 9
  • 132
  • 169
  • But the permission denied never happens if I run it outside the executor – Mojimi Feb 18 '19 at 14:38
  • Also following you theory, wouldn't you need to `await` `run_in_executor` ? – Mojimi Feb 18 '19 at 14:39
  • @Mojimi No, your whole point is to run the task in the background, right? Anyway the permission denied issue is something else. It is possible that this is because you use `ProcessPoolExecutor` which spawns processes. Try with `ThreadPoolExecutor`. – freakish Feb 18 '19 at 14:41
  • So do coroutines get executed even without calling await on them? Wasn't aware of that. – Mojimi Feb 18 '19 at 14:42
  • @Mojimi No, coroutines/tasks are executed when passed to `.create_task` or when created via `.run_in_executor` (or in some other rare situations). In that scenario they don't need await. But not in all scenarios. As for your permission denied: I don't know why would Windows not work with ThreadPoolExecutor. Anyway it looks like there's a problem with your environment. – freakish Feb 18 '19 at 14:44
  • Thank, I think I understand, I will test and report back. I believe the WinError is due to the executor not having access to relative file paths so I need to change to being absolute – Mojimi Feb 18 '19 at 14:54
  • You were correct, this works! And using the default executor there were no permission errors, but as you said that's for another question, thanks – Mojimi Feb 18 '19 at 15:02
1

You can look at asyncio.gather(*tasks) to run multiple in parallel.

Remember that parallel tasks only work well if they are io bound and not blocking.

An example from python docs (https://docs.python.org/3/library/asyncio-task.html):

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())
Christo Goosen
  • 566
  • 4
  • 11
  • 1
    This can be very helpful if you will explain this better, you can add code examples of how to define lists of "tasks" and send them to the "gather" function. – Rea Haas Nov 08 '21 at 13:22