6

I have been struggling for a few days now with a python application where I am expecting to look for a file or files in a folder and iterate through the each file and each record in it and create objects to be persisted on a Janusgraph database. The particular OGM that I am using, requires that the transactions with the database are done in an asynchronously using asyncio. I have read a lot of blogs, posts about asyncio and I think I understand the concept of async, await, tasks, etc... In my application I have defined several functions that handle different parts of the processing:

  • Retrieves the list of all files available
  • Select one file for processing
  • Iterates through the selected file and reads a line/record for processing
  • Receives the record, determines parses the from in and calls several other functions that are responsible for creating the Model objects before they are persisted to the database. For instance, I different functions that creates: User, Session, Browser, DeviceUsed, Server, etc...

I understand (and I may be wrong) that the big advantage of using asyncio is for situations where the call to a function will block usually for I/O, database transaction, network latency, etc...

So my question is if I need to convert all my functions into coroutines and schedule to run through the event loop, or just the ones that would block, like committing transaction to the database. I tried this approach to begin with and had all sorts of problems.

martineau
  • 119,623
  • 25
  • 170
  • 301
Cracoras
  • 347
  • 3
  • 16
  • If you have an existing synchronous codebase, you can generally transform it by converting all functions that do blocking calls to async, then following the chain of callers up to the top so that every function that at least sometimes directly or indirectly calls something async is now async. But if you're working from scratch, it's usually better to think of those async chains before you write anything. – abarnert Apr 04 '18 at 20:52

1 Answers1

6

So my question is if I need to convert all my functions into coroutines and schedule to run through the event loop, or just the ones that would block,

You might need to convert most of them, but the conversion should be largely mechanical, boiling down to changing def to async def, and adding await when calling other coroutines.

Obviously, you cannot avoid converting the ones that actually block, either by switching to the appropriate asyncio API or by using loop.run_in_executor() for those that don't have one. (DNS resolution used to be an outstanding example of the latter.)

But then you also need to convert their callers, because calling a coroutine from a blocking function is not useful unless the function implements event-loop-like functionality. On the other hand, when a coroutine is called from another coroutine, everything works because suspends are automatically propagated to the top of the chain. Once the whole call chain consists of coroutines, the top-level ones are fed to the event loop using loop.create_task() or loop.run_until_complete().

Of course, convenience functions that neither block nor call blocking functions can safely remain non-async, and are invoked by either sync or async code without any difference.


The above applies to asyncio, which implements stackless coroutines. A different approach is used by greenlet, whose tasks encapsulate the call stack, which allows them to be switched at arbitrary places in code that uses normal function calls. Greenlets are a bit more heavyweight and less portable than coroutines, though, so I'd first converting to asyncio.
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • A couple of follow-up questions your answer: (1) I have a main coroutine which will be at the top of the chain where I read records, I am assuming that this one needs to be fed to the event loop. That function will in its turn chain calls to others that are just doing in memory data manipulation or creating objects. If I understood your statement correctly, I don't need to call the subsequent ones using await ? Is it fine to call them as normal functions ? (2) I am also assuming that I can also feed the top coroutine using asyncio.ensure_future(), right ? – Cracoras Apr 05 '18 at 18:02
  • @Cracoras (1) Exactly, if you are calling simple functions that do in-memory stuff, those don't need to be converted to coroutines and consequently don't need to be awaited - it is fine to call them as normal functions. (2) Yes; given a coroutine, `asyncio.ensure_future` and `loop.create_task` are equivalent, but the latter is [the intended API](https://github.com/python/asyncio/issues/477#issuecomment-268709555) when the argument is known to be a coroutine. You can think of `create_task` (and `ensure_future`) as starting a task "in the background". Also look at `asyncio.gather`. – user4815162342 Apr 05 '18 at 18:39
  • I finally finished refactoring my code, and changed all the inner function back to regular functions. However I an getting the following error: file_stats, db_session = process_function(message_types.index(row[0]), row, file_stats, db_session) TypeError: 'coroutine' object is not iterable This line is inside the last of the coroutines that I call in my stack and triggers all the remaining memory bound functions that I mentioned about. – Cracoras Apr 11 '18 at 14:42
  • @Cracoras You need to `await` the coroutine. If it's at the very top of the call chain, you need to call it using `loop.run_until_complete`. – user4815162342 Apr 11 '18 at 17:16
  • In this case, process_function() is not a coroutine, that is why I am not waiting for it. Should I ? The call to it is in a coroutine at the bottom of the stack called: process_rows(): task_process_rows = asyncio.ensure_future(process_rows(row, file_name, file_stats, db_session)) file_stats = await task_process_rows async def process_rows(row, file_name, file_stats, db_session): ... file_stats, db_session = process_function(message_types.index(row[0]), row, file_stats, db_session) await db_session.flush() – Cracoras Apr 11 '18 at 18:13
  • @Cracoras I think `process_function` should be a coroutine because it calls other coroutines. But it's really hard to tell what's going on without seeing the code. Can you create a minimal reproducible example that still demonstrates the problem? – user4815162342 Apr 11 '18 at 19:26
  • I tried to cut out the code in a way that makes sense but I think would still not help. But I checked it into git and here is the full code: https://github.com/marciodebarros/useractivitylogs/blob/master/app/data_loader/routes.py. The error is being thrown on line 195 which is the call for the last coroutine. From there all the remaining calls are to regular functions that doesn't to any I/O. I even tried to make process_function a coroutine but get a similar error. Thank you so much for your patience and all your help. --MD – Cracoras Apr 12 '18 at 14:56
  • @Cracoras The problem is that `process_function` is sometimes invoking async functions, and those need to be awaited. In other words, while you can mix ordinary and async functions, async ones always need to be awaited, so if you do a dispatch that passes through a single code path, `process_function` does, all of its callees must be async, and you need to await them. As a quick fix, you can use `iscoroutine` to see if the result needs to be awaited, [like this](https://pastebin.com/LAtxBsEi). – user4815162342 Apr 13 '18 at 05:01
  • 1
    Hi @user4815162342 after your last comment I went back to the code again and realized what you meant by process_function calling coroutines. It turns out none of those were supposed to be coroutines, but regular functions. I reverted them back and were able to run the code without any issues/exceptions. I will still do some additional testing but it seems like it is working. Thank you so much for all your help and definitely your patience. I hope I don't have to bother you with this issue again ;-) --MD – Cracoras Apr 16 '18 at 15:58