I am new to interprocess communication and I am trying to understand the usage of os.pipe
and os.fork
with each other in Python.
In the code below, If I uncomment the lines "Broken Pipe" error comes otherwise it is working fine.
Idea is to have a SIGCHLD handler when child process exits and increment respective counters when child only function (run_child) and parent only function (sigchld_handler) execute. Since forked process will have its own version of memory and changes will not reflect in parent process, attempt is to let child process send message to parent process via pipe and let parent process update counter.
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
self.rd , self.wr = os.pipe()
print self.rd , self.wr
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
#rf = os.fdopen(self.rd, 'r')
#self.child = int(rf.read())
#rf.close()
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
wr = os.fdopen(self.wr,'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
Interestingly error comes after first few iterations. Can somebody please explain why the error is coming and what should I do to solve this.
EDIT 1: There are a couple of similar examples: ex1 , ex2 , ex3 . I have actually used them only to learn but in my case, I am extending the examples to run in a loop to act more like a producer/consumer queue. I understand it might not be good approach as multiprocess/Queue modules are available in Python but I want to understand the mistake I am making here.
EDIT 2 (solution):
Based on @S.kozlov's answer, modifying code to create a new pipe for every communication. Here is the modified code.
import os
import pdb
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
os.close(self.wr)
print "Main run count : (parent) ", self.parent
rd = os.fdopen(self.rd, 'r')
self.child = int(rd.read())
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self.rd , self.wr = os.pipe()
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
With this, output should come (something) like this.
Main run count (child) : 1
Running in child : 15752
C==> 1
Main run count : (parent) 1
Main run count (child) : 2
Running in child : 15753
C==> 2
Main run count : (parent) 2
Main run count (child) : 3
Running in child : 15754
C==> 3
Main run count : (parent) 3
Main run count (child) : 4
Running in child : 15755
C==> 4
Main run count : (parent) 4
Main run count (child) : 5
Running in child : 15756
C==> 5
Main run count : (parent) 5
Main run count (child) : 6
Running in child : 15757
C==> 6
Main run count : (parent) 6