1

I need to run a gstreamer pipeline to perform video streaming. The GStreamer pipeline requires a GObject.MainLoop object which has a run() method that does not terminate until quit() is called. For this I create a process (P2) from my main application process (P1), which runs the GObject.MainLoop instance in its main thread. The problem is that loop goes on indefinitly within the process P2 and I'm unable to exit/quit it from the main application process (P1).

Following is the section of code that might help understanding the scenario.

'''
start() spawns a new process P2 that runs Mainloop within its main thread.
stop() is called from P1, but does not quit the Mainloop. This is probably because 
processes do not have shared memory
'''
from multiprocessing import Process
import gi

from gi.repository import GObject

class Main:
    def __init__(self):
        self.process = None
        self.loop = GObject.MainLoop()

    def worker(self):
        self.loop.run()

    def start(self):
        self.process=Process(target=self.worker, args=())
        self.process.start()

    def stop(self):
        self.loop.quit()

Next, I tried using a multiprocessing Queue for sharing the 'loop' variable between the processes, but am still unable to quit the mainloop.

'''
start() spawns a new process and puts the loop object in a multiprocessing Queue
stop() calls get() from the loop and calls the quit() method, though it still does not quit the mainloop.
'''
from multiprocessing import Process, Queue
import gi

from gi.repository import GObject

class Main:
    def __init__(self):
        self.p=None
        self.loop = GObject.MainLoop()
        self.queue = Queue()

    def worker(self):
        self.queue.put(self.loop)
        self.loop.run()


    def start(self):
        self.p=Process(target=self.worker, args=())
        self.p.start()

    def stop(self):
        # receive loop instance shared by Child Process
        loop=self.queue.get()
        loop.quit()

How do I call the quit method for the MainLoop object which is only accessible within the child Process P2?

Rhythm Chopra
  • 103
  • 3
  • 9

2 Answers2

0

Ok firstly we need to be using threads not processes. Processes will be in a different address space.

What is the difference between a process and a thread?

Try passing the main loop object to a separate thread that does the actual work. This will make your main method in to nothing but a basic GLib event processing loop, but that is fine and the normal behavior in many GLib applciations.

Lastly, we need to handle the race condition of the child process finishing its work before the main loop activates. We do this with the while not loop.is_running() snippet.

from threading import Thread
import gi

from gi.repository import GObject

def worker(loop):
    while not loop.is_running():
        print("waiting for loop to run")

    print("working")

    loop.quit()
    print("quitting")

class Main:
    def __init__(self):
        self.thread = None
        self.loop = GObject.MainLoop()

    def start(self):
        self.thread=Thread(target=worker, args=(self.loop,))
        self.thread.start()
        self.loop.run()

def main():
    GObject.threads_init()

    m = Main()
    m.start()

if  __name__ =='__main__' : main()
mpr
  • 3,250
  • 26
  • 44
  • Even that doesn't work. Calling stop() method after that doesn't quit the loop. I tried printing loop status by self.loop.is_running() both before and after self.loop.quit() in the stop() method, it says False, i.e. that loop instance was not running. Moreover, on sending terminate signal to self.process, its exitcode returns -15, i.e. the process didn't terminate normally. So the mainloop was still running inside the child Process. Thanks for your response and Please share if you have any other edits. – Rhythm Chopra Dec 07 '17 at 17:14
  • Let me edit the sample code here and remove the stop() method. I'm not sure why it would be needed. Didn't you just want the worker to stop the processing? In that case it'd be done simply by calling loop.quit() from the worker. – mpr Dec 07 '17 at 17:39
  • The worker you are using to create the process is just to quit the loop. But the Main class method worker which starts the loop is never called. So where should I call that one? – Rhythm Chopra Dec 07 '17 at 17:49
  • There were two other issues related to the fact that we were using processes instead of threads, and that there was a race condition. Updated the answer. – mpr Dec 07 '17 at 18:31
  • There is an issue with python threading, that it doesn't allow parallelism ([Python Multiprocessing vs Multithreading](https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python)). And I will need multiple instances of mainloop object running simultaneously. So I had to opt for Multiprocessing. Moreover, in case of Threading, there is no such issue because due to shared space, I can control mainloop instance running in child thread from my main thread as well. So, there is no problem in case of threading. – Rhythm Chopra Dec 07 '17 at 18:49
  • And in the code you updated, mainloop doesn't stay alive to much long. As soon as the mainloop.run() executes, the running thread's while loop breaks, resulting in mainloop.quit(). So practically, MainLoop is in running state for just a fraction of second. – Rhythm Chopra Dec 07 '17 at 18:52
  • You may want to, in your question, draw out the ultimate process structure you want. e.g. do you want to run eight simultaneous processes, each with their own GMainLoop? If that's the case you'll probably need to use process level communication instead of GLib methods. – mpr Dec 07 '17 at 19:14
  • Yeah, that is exactly what I want to accomplish. But the issue is, as soon as GMainLoop starts within a process, the process gets stuck with the loop which prevents it from being available for any sort of communication from the Parent process. So, I just need a way to quit GMainLoop running in another process from the Parent process when Child process is stuck with the mainloop. And as far I have read, there is no way to quit GMainLoop without GMainLoop.quit() method. Please mention if I'm missing something. – Rhythm Chopra Dec 07 '17 at 19:34
  • So you're saying there is a parent process with 8 child processes. What determines when the child processes terminate? Do they just run to completion? Does the parent act as a coordinator? You may want to re-ask your question at a more general level. I think it goes beyond just GMainLoop behavior. – mpr Dec 07 '17 at 19:45
  • Yeah, Actually there is a Main class with GMainLoop Instance in it. When I create 8 object instances of that class inside my Main Application Process, resulting in 8 GMainLoop instances which will run in 8 child Processes. All these are to be started from Main Process, which is pretty simple. But quitting them individually from Parent Process only, is something not actually happening. And yeah, Main Process is the coordinator for all the child Processes. – Rhythm Chopra Dec 07 '17 at 19:57
0

I extended multiprocessing.Process module in my class Main and overridden its run() method to actually run the GObject.Mainloop instance inside another thread (T1) instead of its main thread. And then implemented a wait-notify mechanism which will make the main thread of Process (P2) to go under wait-notify loop and used multiprocessing.Queue to forward messages to the main thread of P2 and P2 will be notified at the same time. For eg, stop() method, which will send the quit message to P2 for which a handler is defined in the overridden run() method. This module can be extended to parse any number of messages to the Child Process provided their handlers are to be defined also.

Following is the code snippet which I used.

from multiprocessing import Process, Condition, Queue
from threading import Thread
import gi

from gi.repository import GObject

loop=GObject.MainLoop()

def worker():
    loop.run()

class Main(Process):

    def __init__(self, target=None, args=()):
        self.target=target
        self.args=tuple(args)
        print self.args
        self.message_queue = Queue()
        self.cond = Condition()
        self.thread = None
        self.loop = GObject.MainLoop()
        Process.__init__(self)

    def run(self):
        if self.target:
            self.thread = Thread(target=self.target, args=())
            print "running target method"
            self.thread.start()

        while True:
            with self.cond:
                self.cond.wait()
            msg = self.message_queue.get()
            if msg == 'quit':
                print loop.is_running()
                loop.quit()
                print loop.is_running()
                break
            else:
                print 'message received', msg

    def send_message(self, msg):
        self.message_queue.put(msg)
        with self.cond:
            self.cond.notify_all()

    def stop(self):
        self.send_message("quit")
        self.join()

    def func1(self):
        self.send_message("msg 1") # handler is defined in the overridden run method

    # few others functions which will send unique messages to the process, and their handlers 
    # are defined in the overridden run method above

This method is working fine for my scenerio but suggestions are welcomed if there is a better way to do the same.

Rhythm Chopra
  • 103
  • 3
  • 9