2

cmd is a function that process the argument x print the output to stdout. For example, it may be

def cmd(x):
  print(x)

A serial program calling cmd() looks like the following.

for x in array:
  cmd(x)

To speed up the program, I'd like it run in parallel. The stdout output can be out-of-order, but the output from a single x must not be broken by the output from another x.

There can be various ways to implement this in python. I figure out something like this.

from joblib import Parallel, delayed
Parallel(n_jobs=100)(delayed(cmd)(i) for i in range(100))

Is this the best way to implement this in python in terms of code simplicity/readability and efficiency?

Also, the above code runs OK on python3. But not on python2, I got the following error. Is it a problem that may cause errors?

/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/joblib/externals/loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value "increase its maximal value", RuntimeWarning)

Thanks.

user1424739
  • 11,937
  • 17
  • 63
  • 152

3 Answers3

1

If you're using python3, then you can use concurrent.futures from standard library instead

Consider following usage:

with concurrent.futures.ProcessPoolExecutor(100) as executor:
     for x in array:
         executor.submit(cmd, x)
Samuel
  • 3,631
  • 5
  • 37
  • 71
  • Just to be sure. Does this guarantee the print results don't interleave each other? – user1424739 Nov 20 '18 at 14:09
  • If you're using python3 then yes. But better solution may be to use [logging](https://docs.python.org/3/library/logging.html) module – Samuel Nov 20 '18 at 14:40
  • Would the results be available immediately? When I use the code in production, I am not sure why the results don't print immediately. I am not sure if this is due the io buffer problem or something related to ProcessPoolExecutor. Does flushing the io buffet cause problems? – user1424739 Nov 20 '18 at 15:07
  • Is your `cmd` method doing anything else besides calling `print`? – Samuel Nov 20 '18 at 15:39
  • Yes. There are some code to do computation, then the results are printed. – user1424739 Nov 20 '18 at 15:54
  • I guess you should check how much time computation takes. – Samuel Nov 20 '18 at 16:18
1

in standard library https://docs.python.org/3/library/threading.html

import threading

def cmd(x):
    lock.acquire(blocking=True)
    print(x)
    lock.release()

lock = threading.Lock()

for i in range(100):
    t = threading.Thread(target=cmd, args=(i,))
    t.start()

Using lock guarantees that the code between lock.acquire() and lock.release() is only executed by one thread at a time. print method is already thread-safe in python3 so the output will not be interrupted even without a lock. But if you have any state shared between threads (an object they modify) you need a lock.

  • How to make sure that there are concurrently no more than n `cmd()` running? – user1424739 Nov 20 '18 at 14:10
  • in the simple approach like mine there is no way. parallel processing is a complex subject. for more advanced threading solutions start here: https://stackoverflow.com/questions/2846653/how-to-use-threading-in-python – Maiki Bodhisattva Nov 20 '18 at 14:20
0

I would approach the issue in the question with the following code (assuming we talk about CPU bound operations):

import multiprocessing as mp
import random


def cmd(value):
    # some CPU heavy calculation
    for dummy in range(10 ** 8):
        random.random()
    # result
    return "result for {}".format(value)


if __name__ == '__main__':
    data = [val for val in range(10)]
    pool = mp.Pool(4)  # 4 - is the number of processes (the number of CPU cores used)
    # result is obtained after the process of all the data
    result = pool.map(cmd, data)

    print(result)

Output:

['result for 0', 'result for 1', 'result for 2', 'result for 3', 'result for 4', 'result for 5', 'result for 6', 'result for 7', 'result for 8', 'result for 9']

EDIT - another implementation to get result immedeately after calculation - processes and queues instead of pool and map:

import multiprocessing
import random


def cmd(value, result_queue):
    # some CPU heavy calculation
    for dummy in range(10 ** 8):
        random.random()
    # result
    result_queue.put("result for {}".format(value))


if __name__ == '__main__':

    data = [val for val in range(10)]
    results = multiprocessing.Queue()

    LIMIT = 3  # 3 - is the number of processes (the number of CPU cores used)
    counter = 0
    for val in data:
        counter += 1
        multiprocessing.Process(
            target=cmd,
            kwargs={'value': val, 'result_queue': results}
            ).start()
        if counter >= LIMIT:
            print(results.get())
            counter -= 1
    for dummy in range(LIMIT - 1):
        print(results.get())

Output:

result for 0
result for 1
result for 2
result for 3
result for 4
result for 5
result for 7
result for 6
result for 8
result for 9
  • I'd like to print the results as soon as they become available (out-of-order is fine). Can `pool.map()` do so? – user1424739 Nov 20 '18 at 14:36
  • @user1424739 I guess it is impossible with `Pool.map()`. Check EDIT addition to the answer for another version of the code. –  Nov 20 '18 at 14:59