I have experienced a very weird issue that I just can't explain when dealing with printing to a file from multiple processes (started with the subprocess module). The behavior I am seeing is that some of my output is slightly truncated and some of it is just completely missing. I am using a slightly modified version of Alex Martelli's solution for thread safe printing found here How do I get a thread safe print in Python 2.6?. The main difference is in the write method. To guarantee that output is not interleaved between the multiple processes writing to the same file I buffer the output and only write when I see a newline.
import sys
import threading
tls = threading.local()
class ThreadSafeFile(object):
"""
@author: Alex Martelli
@see: https://stackoverflow.com/questions/3029816/how-do-i-get-a-thread-safe-print-in-python-2-6
@summary: Allows for safe printing of output of multi-threaded programs to stdout.
"""
def __init__(self, f):
self.f = f
self.lock = threading.RLock()
self.nesting = 0
self.dataBuffer = ""
def _getlock(self):
self.lock.acquire()
self.nesting += 1
def _droplock(self):
nesting = self.nesting
self.nesting = 0
for i in range(nesting):
self.lock.release()
def __getattr__(self, name):
if name == 'softspace':
return tls.softspace
else:
raise AttributeError(name)
def __setattr__(self, name, value):
if name == 'softspace':
tls.softspace = value
else:
return object.__setattr__(self, name, value)
def write(self, data):
self._getlock()
self.dataBuffer += data
if data == '\n':
self.f.write(self.dataBuffer)
self.f.flush()
self.dataBuffer = ""
self._droplock()
def flush(self):
self.f.flush()
It should also be noted that to get this to behave abnormally it is going to require either a lot of time or a machine with multiple processors or cores. I ran the offending program in my test suite ~7000 times on a single processor machine before it reported a failure. This program that I've created to demonstrate the issue I've been experiencing in my test suite also seems to work on a single processor machine, but when you execute it on a multicore or multiprocessor machine it will certainly fail.
The following program shows the issue and it is somewhat more involved than I wanted it to be, but I wanted to preserve enough of the behavior of my programs as possible.
The code for process 1 main.py
import subprocess, sys, socket, time, random
from threadSafeFile import ThreadSafeFile
sys.stdout = ThreadSafeFile(sys.__stdout__)
usage = "python main.py nprocs niters"
workerFilename = "/path/to/worker.py"
def startMaster(n, iters):
host = socket.gethostname()
for i in xrange(n):
#set up ~synchronization between master and worker
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host,0))
sock.listen(1)
socketPort = sock.getsockname()[1]
cmd = 'ssh %s python %s %s %d %d %d' % \
(host, workerFilename, host, socketPort, i, iters)
proc = subprocess.Popen(cmd.split(), shell=False, stdout=None, stderr=None)
conn, addr = sock.accept()
#wait for worker process to start
conn.recv(1024)
for j in xrange(iters):
#do very bursty i/o
for k in xrange(iters):
print "master: %d iter: %d message: %d" % (n,i, j)
#sleep for some amount of time between .02s and .5s
time.sleep(1 * (random.randint(1,50) / float(100)))
#wait for worker to finish
conn.recv(1024)
sock.close()
proc.kill()
def main(nprocs, niters):
startMaster(nprocs, niters)
if __name__ == "__main__":
if len(sys.argv) != 3:
print usage
sys.exit(1)
nprocs = int(sys.argv[1])
niters = int(sys.argv[2])
main(nprocs, niters)
code for process 2 worker.py
import sys, socket,time, random, time
from threadSafeFile import ThreadSafeFile
usage = "python host port id iters"
sys.stdout = ThreadSafeFile(sys.__stdout__)
def main(host, port, n, iters):
#tell master to start
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.send("begin")
for i in xrange(iters):
#do bursty i/o
for j in xrange(iters):
print "worker: %d iter: %d message: %d" % (n,i, j)
#sleep for some amount of time between .02s and .5s
time.sleep(1 * (random.randint(1,50) / float(100)))
#tell master we are done
sock.send("done")
sock.close()
if __name__ == "__main__":
if len(sys.argv) != 5:
print usage
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
n = int(sys.argv[3])
iters = int(sys.argv[4])
main(host,port,n,iters)
When testing I ran main.py as follows:
python main.py 1 75 > main.out
The resulting file should be of length 75*75*2 = 11250 lines of the format:
(master|worker): %d iter: %d message: %d
Most of the time it is short 20-30 lines, but I have seen on occasion the program having the appropriate number of lines. After further investigation of the rare successes some of the lines are being truncated with something like:
ter: %d message: %d
Another interesting aspect to this is that when starting the ssh process using multiprocessing instead of subprocess this program behaves as intended. Some may just say why bother using subprocess when multiprocessing works fine. Unfortunately, it is the academic in me that really wants to know why this is behaving abnormally. Any thoughts and/or insights would be very appreciated. Thanks.
***edit Ben I understand that threadSafeFile uses different locks per process, but I need it in my larger project for 2 reasons.
1) Each process may have multiple threads that will be writing to stdout even though this example does not. So I need to guarantee both safety at the thread level and at the process level.
2) If I don't make sure that when stdout gets flushed that there is a '\n' at the end of the buffer then there is going to be some potential execution trace where process 1 writes its buffer to a file without a trailing '\n' and then process 2 comes in and writes its buffer. Now we have lines interleaving and that's not what I want.
I also understand that this mechanism makes things a bit restrictive for what can be printed. Right now, in my stage of development of this project, restrictiveness is ok. When I can guarantee correctness I can start to relax the restrictions.
Your comment about locking inside of the conditional check if data == '\n' is incorrect. If the lock goes inside the conditional check then threadSafeFile is no longer thread safe in the general case. If any thread can add to the data buffer then there will be a race condition at dataBuffer += data as this is not an atomic operation. Perhaps your comment is simply related to this example in which we only have 1 thread per process, but if that's the case then we don't even need a lock at all.
In regards to OS level locks, my understanding was that multiple programs were able to safely write to the same file on a unix platform iff the number of bytes being written was smaller than the size of the internal buffer. Shouldn't the OS take care of all of the necessary locking for me in this case?