0

I have a class MyLogger for sending messages to log server by using PUBhandler.

An exception gets raised when MyLogger is instanced in LogWorker.init() method (like version 1), however, it is ok if MyLogger is instanced in LogWorker.log_worker() method (version 2).

Any suggestions would be appreciated.

import logging
from multiprocessing import Process
import os
import random
import sys
import time

import zmq
from zmq.log.handlers import PUBHandler


class MyLogger(object):
    ''''''

    def __init__(self, port, handler=None):
        self.port = port
        self.handler = handler or self._construct_sock_handler()
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        if not self.logger.handlers:
            self.logger.addHandler(self.handler)


    def _construct_sock_handler(self):
        context = zmq.Context()
        log_sock = context.socket(zmq.PUB)
        log_sock.connect("tcp://127.0.0.1:%i" % self.port)
        time.sleep(0.1)
        handler = PUBHandler(log_sock)
        return handler


    def get_logger(self):
        return self.logger


def sub_logger(port, level=logging.DEBUG):
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    sub.bind('tcp://127.0.0.1:%i' % port)
    sub.setsockopt(zmq.SUBSCRIBE, "")
    logging.basicConfig(level=level)

    while True:
        level, message = sub.recv_multipart()
        if message.endswith('\n'):
            # trim trailing newline, which will get appended again
            message = message[:-1]
        log = getattr(logging, level.lower())
        log(message)


class LogWorker(object):

    def __init__(self):
        - pass   # version 1
        + self.logger = MyLogger(port).get_logger()   # version 2

    def log_worker(self, port):
        - self.logger = MyLogger(port).get_logger() # version 1
        print "starting logger at %i with level=%s" % (os.getpid(), logging.DEBUG)

        while True:
            level = logging.INFO
            self.logger.log(level, "Hello from %i!" % os.getpid())
            time.sleep(1)

if __name__ == '__main__':
    if len(sys.argv) > 1:
        n = int(sys.argv[1])
    else:
        n = 2

    port = 5555

    workers = [Process(target=LogWorker().log_worker, args=(port,)) for _ in range(n)]
    [w.start() for w in workers]

    try:
        sub_logger(port)
    except KeyboardInterrupt:
        pass
    finally:
        [ w.terminate() for w in workers ]
user3666197
  • 1
  • 6
  • 50
  • 92
Fujiao Liu
  • 2,195
  • 2
  • 24
  • 28
  • **ZeroMQ** architecture strongly discourages from sharing anything, the more to never share Socket instances among threads, the less Context among processes. Python is not an exception. Consider all code-execution units as rather an independent **Agent** and create their respective parts of the ZeroMQ Scale-able Formal Communication Pattern Architecture independently "inside" each of these. Enjoy LFJ the powers of ZeroMQ & StackOverflow Community Pool of Knowledge – user3666197 Jul 29 '15 at 19:25
  • @user3666197, thanks for you edited my question. – Fujiao Liu Jul 30 '15 at 05:33
  • glad you enjoy the StackOverflow Community here. As a next step tip, you may be interested in reading a book mentioned in >>> http://stackoverflow.com/a/25742744/3666197 – user3666197 Jul 30 '15 at 10:01

1 Answers1

0

answer from pyzmq owner minrk:

You cannot pass zmq contexts or sockets across the fork boundary that happens when you instantiate a subprocess with multiprocessing. You have to make sure that you create your Context after you are in the subprocess.

solution:

def work():
    worker = LogWorker(port)
    worker.log_worker()

workers = [ Process(target=work) for _ in range(n) ]
user3666197
  • 1
  • 6
  • 50
  • 92
Fujiao Liu
  • 2,195
  • 2
  • 24
  • 28