12

I have a simple Flask web app that make many HTTP requests to an external service when a user push a button. On the client side I have an angularjs app.

The server side of the code look like this (using multiprocessing.dummy):

worker = MyWorkerClass()
pool = Pool(processes=10)
result_objs = [pool.apply_async(worker.do_work, (q,))
                           for q in queries]
pool.close() # Close pool
pool.join()  # Wait for all task to finish
errors = not all(obj.successful() for obj in result_objs)
# extract result only from successful task
items = [obj.get() for obj in result_objs if obj.successful()]

As you can see I'm using apply_async because I want to later inspect each task and extract from them the result only if the task didn't raise any exception.

I understood that in order to show a progress bar on client side, I need to publish somewhere the number of completed tasks so I made a simple view like this:

@app.route('/api/v1.0/progress', methods=['GET'])
def view_progress():
    return jsonify(dict(progress=session['progress']))

That will show the content of a session variable. Now, during the process, I need to update that variable with the number of completed tasks (the total number of tasks to complete is fixed and known).

Any ideas about how to do that? I working in the right direction?

I'have seen similar questions on SO like this one but I'm not able to adapt the answer to my case.

Thank you.

Community
  • 1
  • 1
raben
  • 3,060
  • 5
  • 32
  • 34

3 Answers3

7

For interprocess communication you can use a multiprocessiong.Queue and your workers can put_nowait tuples with progress information on it while doing their work. Your main process can update whatever your view_progress is reading until all results are ready.

A bit like in this example usage of a Queue, with a few adjustments:

In the writers (workers) I'd use put_nowait instead of put because working is more important than waiting to report that you are working (but perhaps you judge otherwise and decide that informing the user is part of the task and should never be skipped).

The example just puts strings on the queue, I'd use collections.namedtuples for more structured messages. On tasks with many steps, this enables you to raise the resolution of you progress report, and report more to the user.

Community
  • 1
  • 1
Chris Wesseling
  • 6,226
  • 2
  • 36
  • 72
2

In general the approach you are taking is okay, I do it in a similar way.

To calculate the progress you can use an auxiliary function that counts the completed tasks:

def get_progress(result_objs):
    done = 0
    errors = 0
    for r in result_objs:
        if r.ready():
            done += 1
            if not r.successful():
                errors += 1
    return (done, errors)

Note that as a bonus this function returns how many of the "done" tasks ended in errors.

The big problem is for the /api/v1.0/progress route to find the array of AsyncResult objects.

Unfortunately AsyncResult objects cannot be serialized to a session, so that option is out. If your application supports a single set of async tasks at a time then you can just store this array as a global variable. If you need to support multiple clients, each with a different set of async tasks, then you will need figure out a strategy to keep client session data in the server.

I implemented the single client solution as a quick test. My view functions are as follows:

results = None

@app.route('/')
def index():
    global results
    results = [pool.apply_async(do_work) for n in range(20)]
    return render_template('index.html')

@app.route('/api/v1.0/progress')
def progress():
    global results
    total = len(results)
    done, errored = get_progress(results)
    return jsonify({'total': total, 'done': done, 'errored': errored})

I hope this helps!

Miguel Grinberg
  • 65,299
  • 14
  • 133
  • 152
  • Thank you, this is the most complete anwer so far. In the 'multiple client' scenario do you think I could store the state of of every client's process in a redis key for example? – raben Mar 16 '14 at 22:39
  • The biggest problem you have is that you can't serialize `AsyncResult` objects and store them away to a database, so it seems to me the best approach is to use a memory structure as storage. Let's say you come up with a randomly generated key for each client that starts a set of jobs. Then the global `results` becomes a dictionary with these keys and the values being the arrays of `AsyncResult` objects. If you run a single web server process then this works fine, if you have multiple processes then it won't work. – Miguel Grinberg Mar 16 '14 at 22:57
  • To use multiple server processes you may need to have a single helper process that owns the worker pool and the `results` dictionary. The web server processes can communicate with it using sockets/pipes/etc. and route all queries through that process. – Miguel Grinberg Mar 16 '14 at 22:59
0

I think you should be able to update the number of completed tasks using multiprocessing.Value and multiprocessing.Lock.

In your main code, use:

processes=multiprocessing.Value('i', 10)
lock=multiprocessing.Lock()

And then, when you call worker.dowork, pass a lock object and the value to it:

worker.dowork(lock, processes)

In your worker.dowork code, decrease "processes" by one when the code is finished:

lock.acquire()
processes.value-=1
lock.release()

Now, "processes.value" should be accessible from your main code, and be equal to the number of remaining processes. Make sure you acquire the lock before acessing processes.value, and release the lock afterwards

Jeff
  • 2,040
  • 3
  • 18
  • 19
  • 1
    `Value()` already has an associated `RLock`: `with processes.get_lock(): processes.value -= 1` – jfs Mar 14 '14 at 14:49