Im not 100% sure i understood, would something like this work?
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = 0
async def do_work_inner(file):
# do the work on the file here
job_info['iteration'] += 1
await asyncio.sleep(0.5)
tasks = [do_work_inner(file) for file in iter_over]
job_info['status'] = 'inprogress'
await asyncio.gather(*tasks)
jobs[job_key]['status'] = 'done'
This will run all of the work on the file in parallel*, keep in mind that in this case, job_info['iteration'] is mostly meaningless since all of them start together they will increase the value together.
- This is async-parallel, meaning it's not parallel but the event loop will constantly jump from one task to another.
Pay attention that this is really important what is the actual kind of work you want to perform on the files, if its a cpu-related job (calculations, analysis etc) as opposed to mainly IO related job like web calls, then this is the wrong solution, and should be tweaked a bit, if so let me know and I'll try to update it.
Edit: updated version for cpu-related work, progress shows files completed
This is a relatively complete example, just without the actual server
import time
import asyncio
import random
from concurrent.futures import ProcessPoolExecutor
jobs = {}
context = {}
executor = ProcessPoolExecutor()
def do_work_per_file(file, file_number):
# CPU related work here, this method is not async
# do the work on the file here
print(f'Starting work on file {file_number}')
time.sleep(random.randint(1,10) / 10)
return file_number
async def do_work(job_key, files=None):
iter_over = files if files else range(15)
jobs = context['jobs']
job_info = jobs[job_key]
job_info['completed'] = 0
loop = asyncio.get_running_loop()
tasks = [loop.run_in_executor(executor,do_work_per_file, file, file_number) for file,file_number in enumerate(iter_over)]
job_info['status'] = 'inprogress'
for completed_job in asyncio.as_completed(tasks):
print(f'Finished work on file {await completed_job}')
job_info['completed'] += 1
print('Current job status is ', job_info)
jobs[job_key]['status'] = 'done'
print('Current job status is ', job_info)
if __name__ == '__main__':
context['jobs'] = jobs
jobs['abc'] = {}
asyncio.run(do_work('abc'))
The output is
Starting work on file 0
Starting work on file 1
Starting work on file 2
Starting work on file 3
Starting work on file 4
Starting work on file 5
Starting work on file 6
Starting work on file 7
Starting work on file 8
Starting work on file 9
Starting work on file 10
Starting work on file 11
Starting work on file 12
Starting work on file 13
Starting work on file 14
Finished work on file 1
Current job status is {'completed': 1, 'status': 'inprogress'}
Finished work on file 7
Current job status is {'completed': 2, 'status': 'inprogress'}
Finished work on file 9
Current job status is {'completed': 3, 'status': 'inprogress'}
Finished work on file 12
Current job status is {'completed': 4, 'status': 'inprogress'}
Finished work on file 11
Current job status is {'completed': 5, 'status': 'inprogress'}
Finished work on file 13
Current job status is {'completed': 6, 'status': 'inprogress'}
Finished work on file 4
Current job status is {'completed': 7, 'status': 'inprogress'}
Finished work on file 14
Current job status is {'completed': 8, 'status': 'inprogress'}
Finished work on file 0
Current job status is {'completed': 9, 'status': 'inprogress'}
Finished work on file 6
Current job status is {'completed': 10, 'status': 'inprogress'}
Finished work on file 2
Current job status is {'completed': 11, 'status': 'inprogress'}
Finished work on file 3
Current job status is {'completed': 12, 'status': 'inprogress'}
Finished work on file 8
Current job status is {'completed': 13, 'status': 'inprogress'}
Finished work on file 5
Current job status is {'completed': 14, 'status': 'inprogress'}
Finished work on file 10
Current job status is {'completed': 15, 'status': 'inprogress'}
Current job status is {'completed': 15, 'status': 'done'}
Basically what changed is now you are opening a new process pool that handles the work on the files, being a new process also means that CPU intensive work will not block your event loop and stop you from querying the status of the job.