0

I am new to interprocess communication and I am trying to understand the usage of os.pipe and os.fork with each other in Python.

In the code below, If I uncomment the lines "Broken Pipe" error comes otherwise it is working fine.

Idea is to have a SIGCHLD handler when child process exits and increment respective counters when child only function (run_child) and parent only function (sigchld_handler) execute. Since forked process will have its own version of memory and changes will not reflect in parent process, attempt is to let child process send message to parent process via pipe and let parent process update counter.

import os
import signal
import time

class A(object):
    def __init__(self):
        self.parent  = 0
        self.child = 0
        self._child_pid = None

        self.rd , self.wr = os.pipe()
        print self.rd , self.wr
        signal.signal(signal.SIGCHLD, self.sigchld_handler)

    def sigchld_handler(self, a, b):
        self.parent += 1
        print "Main run count : (parent) ", self.parent
        #rf = os.fdopen(self.rd, 'r')
        #self.child = int(rf.read())
        #rf.close()
        self._child_pid = None

    def run_child(self):
        self.child += 1
        print "Main run count (child) : ", self.child
        print "Running in child : " , os.getpid()
        wr = os.fdopen(self.wr,'w')
        text = "%s" % (self.child)
        print "C==>", text
        wr.write(text)
        wr.close()
        os._exit(os.EX_OK)

    def run(self):
        if self._child_pid:
            print "Child Process", self._child_pid, " already running."
        else:
            self._child_pid = os.fork()

            if not self._child_pid:
                self.run_child()

a = A()
i = 0
while True:
    a.run()
    time.sleep(4)
    i += 1
    if i > 5:
        break

Interestingly error comes after first few iterations. Can somebody please explain why the error is coming and what should I do to solve this.

EDIT 1: There are a couple of similar examples: ex1 , ex2 , ex3 . I have actually used them only to learn but in my case, I am extending the examples to run in a loop to act more like a producer/consumer queue. I understand it might not be good approach as multiprocess/Queue modules are available in Python but I want to understand the mistake I am making here.

EDIT 2 (solution):

Based on @S.kozlov's answer, modifying code to create a new pipe for every communication. Here is the modified code.

import os
import pdb
import signal
import time

class A(object):
    def __init__(self):
        self.parent  = 0
        self.child = 0
        self._child_pid = None
        signal.signal(signal.SIGCHLD, self.sigchld_handler)

    def sigchld_handler(self, a, b):
        self.parent += 1
        os.close(self.wr)
        print "Main run count : (parent) ", self.parent
        rd = os.fdopen(self.rd, 'r')
        self.child = int(rd.read())
        self._child_pid = None

    def run_child(self):
        self.child += 1
        print "Main run count (child) : ", self.child
        print "Running in child : " , os.getpid()
        os.close(self.rd)
        wr = os.fdopen(self.wr, 'w')
        text = "%s" % (self.child)
        print "C==>", text
        wr.write(text)
        wr.close()
        os._exit(os.EX_OK)

    def run(self):
        if self._child_pid:
            print "Child Process", self._child_pid, " already running."
        else:
            self.rd , self.wr = os.pipe()
            self._child_pid = os.fork()

            if not self._child_pid:
                self.run_child()

a = A()
i = 0
while True:
    a.run()
    time.sleep(4)
    i += 1
    if i > 5:
        break

With this, output should come (something) like this.

Main run count (child) :  1
Running in child :  15752
C==> 1
Main run count : (parent)  1
Main run count (child) :  2
Running in child :  15753
C==> 2
Main run count : (parent)  2
Main run count (child) :  3
Running in child :  15754
C==> 3
Main run count : (parent)  3
Main run count (child) :  4
Running in child :  15755
C==> 4
Main run count : (parent)  4
Main run count (child) :  5
Running in child :  15756
C==> 5
Main run count : (parent)  5
Main run count (child) :  6
Running in child :  15757
C==> 6
Main run count : (parent)  6 
Community
  • 1
  • 1
ViFI
  • 971
  • 1
  • 11
  • 27
  • It's working for me without error. I've run it many times. – quantummind Nov 17 '16 at 23:16
  • @quantummind : You mean to say after "uncommenting" lines in sigchld_handler. – ViFI Nov 17 '16 at 23:35
  • Broken pipe happens when you try to write to a pipe after the reading end has been closed. I don't think it can happen when trying to read. – Barmar Nov 17 '16 at 23:40
  • Thanks @Barmar . That is a useful bit. So does it mean , should i delete the line rf.close() from sigchld_handler function. as This overall process is part of loop. – ViFI Nov 17 '16 at 23:43
  • No errors for me in either case. The result however differs. With comments it runs through the while loop until i>5. When uncommenting, the last output line is "Main run count : (parent) 1" and keeps running. – quantummind Nov 17 '16 at 23:45

1 Answers1

2

The problem with your code is that you are trying to reuse one pipe several times, and it's not the valid case for pipe in general. The exception you are getting just saying you: "Hey, you have closed this pipe on the previous run. Once a pipe is closed, it's closed.".

So you can change your code to create a pipe for each child, store one end (read) in the "parent" and give another to the child. Then it should work.

Edit 1. I've updated your code with that thing about "one pipe for every child", it's not how the good code supposed to be, but in educational sense hope it will help.

import os
import signal
import time


class A(object):
    def __init__(self):
        self.parent = 0
        self.child = 0
        self._child_pid = None
        signal.signal(signal.SIGCHLD, self.sigchld_handler)

    def sigchld_handler(self, a, b):
        self.parent += 1
        print "Main run count : (parent) ", self.parent
        os.close(self.wr)
        rf = os.fdopen(self.rd, 'r')
        message = rf.read()
        rf.close()
        print "Code from child [", self._child_pid, "]: ", message
        self.rd = None
        self._child_pid = None

    def run_child(self):
        self.child += 1
        print "Main run count (child) : ", self.child
        print "Running in child : " , os.getpid()
        os.close(self.rd)
        wr = os.fdopen(self.wr, 'w')
        text = "Hello from %s" % (self.child)
        print "C==>", text
        wr.write(text)
        wr.close()
        os._exit(os.EX_OK)

    def run(self):
        if self._child_pid:
            print "Child Process", self._child_pid, " already running."
        else:
            rd, wr = os.pipe()
            self.rd = rd
            self.wr = wr
            self._child_pid = os.fork()

            if not self._child_pid:
                self.run_child()
a = A()
i = 0
while True:
    a.run()
    time.sleep(4)
    i += 1
    if i > 5:
        break
Sergii Kozlov
  • 838
  • 8
  • 11
  • Btw, can I keep pipe open and instead just do some flush kind of operation on it. After your comments, I modified the code to fix the error but I think with this approach we are opening too many file descriptors. Is it a good approach ? – ViFI Nov 18 '16 at 00:05
  • 1
    I see your point. Yeah, pipes are not so flexible, that is why you will start using queue's, sockets, etc. You can find brief comparison of pipes and unix sockets: http://stackoverflow.com/questions/9475442/unix-domain-socket-vs-named-pipes – Sergii Kozlov Nov 18 '16 at 00:16