9

I've read in some sources that the print command is not thread-safe and the workaround is to use sys.stdout.write command instead, but still it doesn't work for me and the writing to the STDOUT isn't atomic.

Here's a short example (called this file parallelExperiment.py):

   import os
   import sys
   from multiprocessing import Pool

   def output(msg):
    msg = '%s%s' % (msg, os.linesep)
    sys.stdout.write(msg)

   def func(input):
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input)))

   def executeFunctionInParallel(funcName, inputsList, maxParallelism):
       output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
           funcName.__name__, len(inputsList), maxParallelism))
       parallelismPool = Pool(processes=maxParallelism)
       executeBooleanResultsList = parallelismPool.map(funcName, inputsList)
       parallelismPool.close()
       output(u'Function %s executed on input of size %d  with maximum parallelism of %d' % (
           funcName.__name__, len(inputsList), maxParallelism))
       # if all parallel executions executed well - the boolean results list should all be True
       return all(executeBooleanResultsList)

   if __name__ == "__main__":
    inputsList=[str(i) for i in range(20)]
    executeFunctionInParallel(func, inputsList, 4)

Look at the output :

i. Output of calling python parallelExperiment.py (pay attention that the word "pid" is messed up in some lines) :

Executing function func on input of size 20 with maximum parallelism of 4
ppid:2240 got input "0"
id:4960 got input "2"
pid:4716 got input "4"
pid:4324 got input "6"
ppid:2240 got input "1"
id:4960 got input "3"
pid:4716 got input "5"
pid:4324 got input "7"
ppid:4960 got input "8"
id:2240 got input "10"
pid:4716 got input "12"
pid:4324 got input "14"
ppid:4960 got input "9"
id:2240 got input "11"
pid:4716 got input "13"
pid:4324 got input "15"
ppid:4960 got input "16"
id:2240 got input "18"
ppid:2240 got input "19"
id:4960 got input "17"
Function func executed on input of size 20  with maximum parallelism of 4

ii. Output of calling python parallelExperiment.py > parallelExperiment.log, meaning redirect the stdout to the parallelExperiment.log file (pay attention that the order of lines isn't good, because before and after calling to executeFunctionInParallel that calls func in parallel, a message should be printed) :

pid:3244 got input "4"
pid:3244 got input "5"
pid:3244 got input "12"
pid:3244 got input "13"
pid:240 got input "0"
pid:240 got input "1"
pid:240 got input "8"
pid:240 got input "9"
pid:240 got input "16"
pid:240 got input "17"
pid:1268 got input "2"
pid:1268 got input "3"
pid:1268 got input "10"
pid:1268 got input "11"
pid:1268 got input "18"
pid:1268 got input "19"
pid:3332 got input "6"
pid:3332 got input "7"
pid:3332 got input "14"
pid:3332 got input "15"
Executing function func on input of size 20 with maximum parallelism of 4
Function func executed on input of size 20  with maximum parallelism of 4
Tal Barda
  • 623
  • 1
  • 9
  • 14
  • The messed up result is what is excpected of multi-threading tho. You spawn threads that all have their own timeline. The fact that bash messes up the string *pid* is not really uncommon. It's mostly, as you said, that it doesn't really accept multiple receptors well. It just prints what it gets and sometimes messes overlapping callbacks. –  Jul 27 '14 at 09:25

2 Answers2

8

That happen because of multiprocessing.Pool actually uses subprocesses instead of threads. You need to use explicit synchronization between processes. Note, the example on the link, it solves your issue.

import os
import sys
from multiprocessing import Pool, Lock

lock = Lock()

def output(msg):
    msg = '%s%s' % (msg, os.linesep)
    with lock:
        sys.stdout.write(msg)

def func(input):
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input)))

def executeFunctionInParallel(funcName, inputsList, maxParallelism):
    output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
      funcName.__name__, len(inputsList), maxParallelism))
    parallelismPool = Pool(processes=maxParallelism)
    executeBooleanResultsList = parallelismPool.map(funcName, inputsList)
    parallelismPool.close()
    parallelismPool.join()
    output(u'Function %s executed on input of size %d  with maximum parallelism of %d' % (
       funcName.__name__, len(inputsList), maxParallelism))
    # if all parallel executions executed well - the boolean results list should all be True
    return all(executeBooleanResultsList)

if __name__ == "__main__":
    inputsList=[str(i) for i in range(20)]
    executeFunctionInParallel(func, inputsList, 4)
Dmitry Vakhrushev
  • 1,382
  • 8
  • 12
2

If you want to avoid locking and are happy going to a lower-level interface, you can get POSIX O_APPEND behaviour with os.open, os.write (if your system supports it); and see Is file append atomic in UNIX? .

Community
  • 1
  • 1
Colin Phipps
  • 908
  • 5
  • 8