7

Recently, I have asked a question regarding how to track the progress of a for loop inside a API deployed. Here's the link.

The solution code that worked for me is,

from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid


context = {'jobs': {}}

app = FastAPI()


async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    jobs[job_key]['status'] = 'done'


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }

This way, I can track the progress of the for loop inside the do_work using the identifier by calling status method

Now, I am looking for a way to parallelize the for loop inside do_work method.

But if I use joblib then I don't know how to track each file being processed, the iteration count will be meaningless because all files will be processed in parallel.

Note: I just gave an example with joblib because I am not very familiar with other libraries. The processing on the file is bit heavy cpu based work. I'm preprocessing file and loading 4 tensorflow models and predict it on the file and writing to sql database.

If anyone knows any methods in which I can do it, please share and help me out.

user_12
  • 1,778
  • 7
  • 31
  • 72
  • Is there a reason why you need to run it with joblib instead of `asyncio.run_coroutine_threadsafe` ? With such function you can share variables, which could be a great and simple idea, in my opinion – lsabi Dec 06 '20 at 13:43
  • No, I just gave an example of `joblib` because I am familiar with that library, all I am looking is making my for loop execution parallel with-out losing the functionality of keep tracking which iteration is going on so I track the progress as well – user_12 Dec 06 '20 at 13:46
  • According to `joblib` 's docs, it looks like you can achieve it by sharing variables. This may cause race conditions, but if the function is the only who is working on that `id` and the `id`s are guaranteed to be uniuqe, that should not be the case. See https://joblib.readthedocs.io/en/latest/auto_examples/parallel_memmap.html#sphx-glr-auto-examples-parallel-memmap-py – lsabi Dec 06 '20 at 15:12
  • can you share a working example with my code given above on how you would do it? What do you mean by `id` ? Do you mean the status identifier? – user_12 Dec 06 '20 at 15:15

3 Answers3

2

Im not 100% sure i understood, would something like this work?

async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    jobs = context['jobs']
    job_info = jobs[job_key]
    job_info['iteration'] = 0

    async def do_work_inner(file):
        # do the work on the file here
        job_info['iteration'] += 1
        await asyncio.sleep(0.5)

    tasks = [do_work_inner(file) for file in iter_over]
    job_info['status'] = 'inprogress'
    await asyncio.gather(*tasks)
    jobs[job_key]['status'] = 'done'

This will run all of the work on the file in parallel*, keep in mind that in this case, job_info['iteration'] is mostly meaningless since all of them start together they will increase the value together.

  • This is async-parallel, meaning it's not parallel but the event loop will constantly jump from one task to another.

Pay attention that this is really important what is the actual kind of work you want to perform on the files, if its a cpu-related job (calculations, analysis etc) as opposed to mainly IO related job like web calls, then this is the wrong solution, and should be tweaked a bit, if so let me know and I'll try to update it.

Edit: updated version for cpu-related work, progress shows files completed

This is a relatively complete example, just without the actual server

import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor



jobs = {}
context = {}
executor = ProcessPoolExecutor()


def do_work_per_file(file, file_number):
    # CPU related work here, this method is not async
    # do the work on the file here
    print(f'Starting work on file {file_number}')
    time.sleep(random.randint(1,10) / 10)
    return file_number


async def do_work(job_key, files=None):
    iter_over = files if files else range(15)
    jobs = context['jobs']
    job_info = jobs[job_key]
    job_info['completed'] = 0

    loop = asyncio.get_running_loop()
    tasks = [loop.run_in_executor(executor,do_work_per_file, file, file_number) for file,file_number in enumerate(iter_over)]
    job_info['status'] = 'inprogress'
    for completed_job in asyncio.as_completed(tasks):
        print(f'Finished work on file {await completed_job}')
        job_info['completed'] += 1
        print('Current job status is ', job_info)
        

    jobs[job_key]['status'] = 'done'
    print('Current job status is ', job_info)

if __name__ == '__main__':
    context['jobs'] = jobs
    jobs['abc'] = {}
    asyncio.run(do_work('abc'))

The output is

Starting work on file 0
Starting work on file 1
Starting work on file 2
Starting work on file 3
Starting work on file 4
Starting work on file 5
Starting work on file 6
Starting work on file 7
Starting work on file 8
Starting work on file 9
Starting work on file 10
Starting work on file 11
Starting work on file 12
Starting work on file 13
Starting work on file 14
Finished work on file 1
Current job status is  {'completed': 1, 'status': 'inprogress'}
Finished work on file 7
Current job status is  {'completed': 2, 'status': 'inprogress'}
Finished work on file 9
Current job status is  {'completed': 3, 'status': 'inprogress'}
Finished work on file 12
Current job status is  {'completed': 4, 'status': 'inprogress'}
Finished work on file 11
Current job status is  {'completed': 5, 'status': 'inprogress'}
Finished work on file 13
Current job status is  {'completed': 6, 'status': 'inprogress'}
Finished work on file 4
Current job status is  {'completed': 7, 'status': 'inprogress'}
Finished work on file 14
Current job status is  {'completed': 8, 'status': 'inprogress'}
Finished work on file 0
Current job status is  {'completed': 9, 'status': 'inprogress'}
Finished work on file 6
Current job status is  {'completed': 10, 'status': 'inprogress'}
Finished work on file 2
Current job status is  {'completed': 11, 'status': 'inprogress'}
Finished work on file 3
Current job status is  {'completed': 12, 'status': 'inprogress'}
Finished work on file 8
Current job status is  {'completed': 13, 'status': 'inprogress'}
Finished work on file 5
Current job status is  {'completed': 14, 'status': 'inprogress'}
Finished work on file 10
Current job status is  {'completed': 15, 'status': 'inprogress'}
Current job status is  {'completed': 15, 'status': 'done'}

Basically what changed is now you are opening a new process pool that handles the work on the files, being a new process also means that CPU intensive work will not block your event loop and stop you from querying the status of the job.

Ron Serruya
  • 3,988
  • 1
  • 16
  • 26
  • The most important things to me, is a way to keep tracking the iteration, I want a method or a API endpoint which I can periodically call which will return me which file is being processed. You mentioned `job_info['iteration']` is mostly meaningless but I require the way to know which iteration is going on. – user_12 Dec 06 '20 at 22:03
  • @user_12 but if you are running them all in parallel, doesn't that mean that all iterations are going on at the same time? unless you want to show how many iterations finished, in this case, you would just move the +=1 to the end of the work instead of the start – Ron Serruya Dec 07 '20 at 10:34
  • What I meant, the loop is running parallel 4 times, so for each second now I will have 4 files processed rather than 1 file, so when I call the status API it should return 4 files processed. – user_12 Dec 07 '20 at 11:26
  • I need some way to track the progress. – user_12 Dec 07 '20 at 11:26
  • then, in that case, my answer will work, you have 4 tasks, then launch all of the parallel, in this case `job_info['iteration'] will be 4. I meant that it is meaningless because it will always be 4, as long as its running. You can add `job_info['iteration'] -= 1` at the end of the task, that way if one finishes early you will see 3 instead of 4. – Ron Serruya Dec 07 '20 at 17:34
  • 1
    But like I said its important to know what kind of job you are doing on the files because this is not the right approach for cpu-heavy work – Ron Serruya Dec 07 '20 at 17:34
  • Thank you so much for clarifying. Actually, it's a bit of cpu work but basically I am preprocessing each file and loading 4 trained neural network tensorflow models and predict the predictions on that file and write the results to mysql database on the server. It also takes gpu and memory. Can you tell me what to do when I am doing such cpu based work. – user_12 Dec 07 '20 at 18:07
  • Like you mentioned, If I have 10 parallel work going on, and I only have 10 files then when I hit the status API it just returns 10 at once. So like you mentioned it will be meaningless because it will be 10. Is there a way to have iteration count only if each file completes the processing (like complete processing done on one file)? – user_12 Dec 07 '20 at 19:00
  • @user_12 Updated my comment with a better version for this, I never worked with GPU specific work but I belive it should behave the same – Ron Serruya Dec 07 '20 at 19:23
  • thank you so much for updating the answer, but I need to deploy it using uvicorn `uvicorn main:app`, I can't use `if __name__ == "__main__"`. Can you please update the updated answer based on my example (with api endpoints/methods etc.,) so it be easier for me to understand and use it in my program. – user_12 Dec 08 '20 at 06:07
  • I am not being able to make it as a fastapi async endpoint method I have been trying a lot? Where should I run the `asyncio.run(do_work('abc'))`? Can you please update it based on the example provided in the question. In which method should I use it? – user_12 Dec 09 '20 at 09:26
0

EDIT

It seems that the Parallel function of joblib is blocking the thread that answers to requests. A probably better solution is the one of @Ron Serruya, where they managed to not block the main thread.

OLD ANSWER

Here's a potential solution. note that I didn't test, but it should be enough to give you a rough idea. Also, I wasn't 100% sure of what you needed so it definitely needs a review before from your side before running it.

Despite that, I do not understand why you don't use a database for keeping the status of the iteration. This way, you avoid running into concurrency (or rails) issue and can keep the state of the iteration/training even in case of power failure.

from fastapi import FastAPI, UploadFile
from typing import List
import asyncio
import uuid
from joblib import Parallel, delayed


context = {'jobs': {}}

app = FastAPI()

def parallelize(iterate_count):
    # Function part that needs to be run in parallel
    iterate_count += 1
    

async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    jobs = context['jobs'][job_key]
    jobs["iteration"] = 0
    jobs['status'] = 'inprogress'
    Parallel()(delayed(parallelize)(jobs["iteration"]) for file, file_number in enumerate(iter_over))
    jobs['status'] = 'done'


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }
lsabi
  • 3,641
  • 1
  • 14
  • 26
  • I think you haven't understood my problem, can you check again I updated the question a bit. Where do I process each file? – user_12 Dec 06 '20 at 22:07
  • The code I posted is parallelizing the iteration within the function `do_work`. Since you have to call a function to be run in parallel, I had to write a new function, namely,`parallelize` that performs the parallel computation. Problem is, when you ditch the work to other threads, there's a rails condition which I tried to take into account and thus had to change some of the context's structure in order to avoid as much as I could the problem – lsabi Dec 06 '20 at 22:16
  • I think you are still not getting what I am looking for, if you use `Parallel()` then the `iteration` count will be meaning less right? It won't reflect the true file being processed. But I need a way to track the iteration/file being processed and being able to parallelize it. – user_12 Dec 07 '20 at 08:36
  • I don't get it. In the piece of code I show how to track the reading and processing of files (although, I wrote a comment to indicate where to place your code). That does exactly what you've been asking for. Since `Parallel()` is useless, then why do you even ask? I'm not getting it. Try the code first (adjust it before since I didn't test it) and then come back and tell the adjustments you had to do and if it solved your problem or not – lsabi Dec 07 '20 at 13:33
  • Its not working, I tried the above code, it keep on saying iteration at 0. Its not changing. Can you verify it once. – user_12 Dec 07 '20 at 18:44
0

If you receive many requests and the processing time is large, parallelizing the work across multiple threads could potentially starve API clients. So make sure you limit the number of threads (or processes/executors - see below) per call to a small number.

You could use pyspark to distribute the file paths to executors (each runs as a process) that'll do the work, you can have multiple executors per machine and you can distribute across multiple machines.

Another option would be to use a thread pool via concurrent.futures, use max_workers to limit the number of threads per request.

and pass a concurrent collection to the thread's at launch, so they can "report" their progress by writing to this collection (you can wrap a regular collection with a lock, since Python doesn't provide spin-lock based concurrent collections).

Danny Varod
  • 17,324
  • 5
  • 69
  • 111
  • Can you provide a workable example based on my example in my question so that I can use as a base? I am not very familiar with the workflow? – user_12 Dec 11 '20 at 11:06