3

My requirement is similar to Multiple producers, single consumer except i need it in python

I have created an application that spawns 5 concurrent processes (I am using multiprocessing library).These 5 process are independently producing output in dict format.

Earlier I was printing the output to console, but now would like to output it to a file.

I am looking for a pattern where all my 5 producers write to a shared queue that supports concurrent writes.

And a single consumer process that too has access to this queue and consumes the data from it, with the ability to wait if there is no data to write and terminate when producers are done with their task.

Thanks Anuj

Community
  • 1
  • 1
Anupam Saini
  • 2,431
  • 2
  • 23
  • 30

2 Answers2

1

I've implemented this pattern in Python where a supervisor process spawns a bunch of processes and then consumes log messages from all of them and writes those log messages into a single log file.

Basically, I used execve to spawan the processes an specified that stderr for each process was connected to a PTY. Then my supervisor opened all the master PTYs and used select to read from them in a loop. The PTY's are line-buffered by the tty line discipline and you can use readline on them for non=blocking reads. I believe that I also used fcntl on the PTYs to set os.O_NONBLOCK as well.

Works great. The only hitch is that you need to read more than one line per pty when you return from the select poll, otherwise you can lose output (assuming you have something reaping the child processes and restarting). By reading all lines available on each PTY you also avoid tracebacks getting interleaved with other messages.

If you really need to send objects rather than text lines, then you are better of using a real pub-sub messaging system like AMQP or ZeroMQ. AMQP is a much bigger hammer than you need so only check that out if you expect to be building lots of similar apps. Otherwise, try the simpler 0MQ http://www.zeromq.org/intro:read-the-manual which is just a messaging library that makes sockets much much easier to use.

Michael Dillon
  • 31,973
  • 6
  • 70
  • 106
  • You can also do some line buffering in your merger process, inserting banners as needed into the log output so you can keep the outputs straight. – Mike DeSimone Sep 01 '11 at 05:03
  • Thanks Micheal , In my case i do not have logs actually , Each of my process will be given a URL and using selenium webdriver api it will navigate the URL and once done I would be fetching certain data from the DOM in dict format. Each process will generate a dict from an input url and will be putting it in a queue , insertion order is not important. While at write time a single consumer will try fetching 5 objects from queue and block if not available and will write to a file. Your answer though helpful will be good if you could also include some code as well. – Anupam Saini Sep 01 '11 at 05:11
1

since you are already using multiprocess, all you need is the Queue class

and a sample (modified from the Queue docs)

from multiprocessing import Process, Queue

def child(q, url):
    result = my_process(url)
    q.put(result)

if __name__ == '__main__':
    q = Queue()
    urls = [...]
    children = []
    for url in urls:
       p = Process(target=child, args=(q,url))
       p.start()
       children.append(p)
    for p in children:
       p.join()
       print q.get() #or write to file (might not be the answer from this child)

Edit: For multiple answers from each child replace the last for loop with:

while 0 != multiprocessing.active_children():
    print q.get()
Eric O. Lebigot
  • 91,433
  • 48
  • 218
  • 260
Mihai Stan
  • 1,052
  • 6
  • 7
  • shouldn't this childs.append(child) be childs.append(p) – Anupam Saini Sep 01 '11 at 09:24
  • Please also let me know the behavior when i will fetch objects from the queue Let us consider the scenario when queue is quite large, I want to continuously poll the queue for results and wait on it when it is empty. – Anupam Saini Sep 01 '11 at 09:30
  • Accepting the answer as I am using multiprocessing queues in my code. But for child process management I have a separate monitoring thread in my parent process that continuously monitors a process status and the output returned to check for dead or blocked for long time child processes. Actually I spwan firefox and XVFB instances in child process and Firefox being Firefox it tends to hang often and makes my child process un-responsive :) – Anupam Saini Aug 27 '12 at 07:55
  • 1
    The p.join() will block waiting on child q.put eventually causing a deadlock, its explained under "Joining processes that use queues" at https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming – vikkyhacks Jan 30 '19 at 17:44