316

Right now I have a central module in a framework that spawns multiple processes using the Python 2.6 multiprocessing module. Because it uses multiprocessing, there is module-level multiprocessing-aware log, LOG = multiprocessing.get_logger(). Per the docs, this logger (EDIT) does not have process-shared locks so that you don't garble things up in sys.stderr (or whatever filehandle) by having multiple processes writing to it simultaneously.

The issue I have now is that the other modules in the framework are not multiprocessing-aware. The way I see it, I need to make all dependencies on this central module use multiprocessing-aware logging. That's annoying within the framework, let alone for all clients of the framework. Are there alternatives I'm not thinking of?

vvvvv
  • 25,404
  • 19
  • 49
  • 81
cdleary
  • 69,512
  • 53
  • 163
  • 191
  • 13
    The docs you link to, state the exact opposite of what you say, the logger has no process shared locks and things get mixed up - a problem I had as well. – Sebastian Blask Jan 12 '12 at 10:51
  • 3
    see examples in the stdlib docs: [Logging to a single file from multiple processes](http://docs.python.org/dev/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes). The recipes doesn't require other modules to be multiprocessing-aware. – jfs Sep 02 '12 at 23:05
  • So, what is the use case for `multiprocessing.get_logger()`? It seems based on these other ways of doing logging are the logging functionality in `multiprocessing` of little value. – Tim Ludwinski Nov 26 '14 at 16:38
  • 5
    `get_logger()` is the logger used by `multiprocessing` module itself. It is useful if you want to debug a `multiprocessing` issue. – jfs Jun 29 '15 at 20:06

23 Answers23

149

I just now wrote a log handler of my own that just feeds everything to the parent process via a pipe. I've only been testing it for ten minutes but it seems to work pretty well.

(Note: This is hardcoded to RotatingFileHandler, which is my own use case.)


Update: @javier now maintains this approach as a package available on Pypi - see multiprocessing-logging on Pypi, github at https://github.com/jruere/multiprocessing-logging


Update: Implementation!

This now uses a queue for correct handling of concurrency, and also recovers from errors correctly. I've now been using this in production for several months, and the current version below works without issue.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
zzzeek
  • 72,307
  • 23
  • 193
  • 185
  • 1
    One nit: you need to import traceback as well. – Jason Baker Apr 02 '10 at 18:56
  • 1
    Is this code actually handling the problem? When you spawn off several processes using this handler, each process will have its own thread to check the queue and they write concurrently to the log file just like RotatingFileHandler would? Not only can log entries get mixed up, I also got stuff from different processes end up in different files as the rotation didn't seem to work well. Inspired by your solution though, I split the code, so I check on the queue in a thread that I start after spawning the processes. – Sebastian Blask Jan 12 '12 at 11:01
  • 5
    The above handler does all the file writing from the parent process and uses just one thread to receive messages passed from child processes. If you invoke the handler itself from a spawned child process then that's using it incorrectly, and you'll get all the same issues as RotatingFileHandler. I've used the above code for years with no issue. – zzzeek Jan 12 '12 at 23:29
  • 13
    Unfortunately this approach doesn't work on Windows. From http://docs.python.org/library/multiprocessing.html 16.6.2.12 "Note that on Windows child processes will only inherit the level of the parent process’s logger – any other customization of the logger will not be inherited." Subprocesses won't inherit the handler, and you can't pass it explicitly because it's not pickleable. – Noah Yetter Mar 02 '12 at 04:16
  • I think that only refers to the logger that's hardwired into the multiprocessing module. This recipe isn't making any usage of that nor should it care about propagation of loglevels, it just shuttles data from child to parent using normal multiprocessing channels. Have you tried it ? – zzzeek Mar 02 '12 at 06:35
  • 1
    I realize this is a year later, but I think the last comment is wrong. It relies on the effects of `logging.getLogger().addHandler(…)` being propagated to the children, which is exactly what _doesn't_ work. Sure, it doesn't rely on propagation of loglevels, but that's the part that _does_ work, so who cares? – abarnert Jul 10 '13 at 23:08
  • I see no documentation which says this (but point me to what you're seeing). When they talk about "multiprocessing.get_logger()" in that linked document, it is a hardwired logger in the multiprocessing module. it says nothing about the addHandler() method of the generic logging module. Again, have you tried it ? – zzzeek Jul 11 '13 at 00:47
  • 1
    Somehow not all messages get to file and some lines are garbled. Could you expand on how to properly instantiate the logger on Windows? I refer to your class from config read by `logging.config.fileConfig`. I read config and get a logger right after all import statements. – mlt Oct 01 '13 at 20:48
  • 2
    It worth noting that `multiprocessing.Queue` uses a thread to in `put()`. So do not invoke `put` (i.e. log a msg using `MultiProcessingLog` handler) before creating all subprocesses. Otherwise the thread will be dead in child process. One solution is to call `Queue._after_fork()` at the start of each child process, or to use `multiprocessing.queues.SimpleQueue` instead, which does not involve thread but is blocking. – Danqi Wang Oct 11 '13 at 10:08
  • 10
    Could you add a simple example that shows initialization, as well as usage from a hypothetical child process? I'm not quite sure how the child process is supposed to get access to the queue without instantiating another instance of your class. – JesseBuesking Apr 10 '14 at 22:04
  • @JesseBuesking The handler above is assigned using addHandler() or setting it up in fileConfig() like any other handler. It's established at application startup time before any child process is created. The child process (on a POSIX system) will inherit the handler instance. – zzzeek Apr 12 '14 at 22:40
  • 2
    @zzzeek Thanks for your quick response. After toying with this a bit more, it appears this currently only works Linux (possibly Mac, I haven't tried). I've created a gist of what I ran on both platforms and the resulting output [here](https://gist.github.com/JesseBuesking/10674086). Would you happen to have an idea of what possible modifications would need to be made to get this working on both linux and windows? This is my first foray into multiprocessing with Python, which is why I'm having a rough time of this. – JesseBuesking Apr 14 '14 at 19:00
  • ive no experience with multiprocessing on windows or even how it could possibly work as i thought windows has no fork(). – zzzeek Apr 14 '14 at 22:48
  • 13
    @zzzeek, this solution is good but I couldn't find a package with it or something similar so I created one called `multiprocessing-logging`. – Javier Jan 22 '15 at 22:39
  • Thanks for this post, and for Javier`s package to help understand the usage! – xlash May 05 '16 at 16:39
  • Just added this into my process based pipeline -- works great: ` logger = logging.getLogger('my_logger') logger.setLevel(logging.DEBUG) handler = MultiProcessingLog("pipeline/logs/job-1.csv", "a", 0, 0) logger.addHandler(handler) ` – eggie5 Jan 16 '18 at 14:18
  • 1
    @javier someone just pointed me here that you made a legit package so I've added it to the top, thanks! – zzzeek Jan 24 '18 at 15:07
  • Thanks! It has some tweaks that you might want to check! – Javier Jan 24 '18 at 20:36
  • @zzzeek does this work on a cluster with multiple machines – physicist Sep 06 '19 at 17:16
  • @Javier I have tried to use multiprocessing-logging and multiprocessing_logging.install_mp_handler(logger) but it seems that once a non-multiprocess line is written to the log it 'looses' its multiprocessing logging abilities. So using this code I can either log multiprocessing functions or non-multiprocessing but not both... is there a way? – Shani Shalgi Oct 10 '19 at 15:21
  • @ShaniShalgi please open a ticket in GitHub. This is not a good place for this discussion. – Javier Oct 10 '19 at 15:25
  • I opened a ticket but it also relates to the code written above. – Shani Shalgi Oct 10 '19 at 15:43
  • 2
    This won't work when the multiprocessing context is set to `spawn`, right? With the `spawn` context, new processes are fresh Python interpreters, not forks of the existing one, so they don't inherit anything from the main process. – Eric Frechette Aug 28 '20 at 18:17
  • 2
    @EricFrechette I just checked and this solution does not work if the context is set to `spawn` – David Segonds Apr 26 '21 at 22:20
92

The only way to deal with this non-intrusively is to:

  1. Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped.
  2. Your controller process can then do one of the following:
    • If using disk files: Coalesce the log files at the end of the run, sorted by timestamp
    • If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)
Zearin
  • 1,474
  • 2
  • 17
  • 36
vladr
  • 65,483
  • 18
  • 129
  • 130
  • Nice, that was 35s before I thought of that (thought I'd use `atexit` :-). Problem is that it won't give you a realtime readout. This may be part of the price of multiprocessing as opposed to multithreading. – cdleary Mar 13 '09 at 04:41
  • 1
    @cdleary, using the piped approach it would be as near-realtime as one can get (especially if stderr is not buffered in the spawned processes.) – vladr Mar 13 '09 at 04:46
  • +1 I had this general thought too. I especially like your on-the-fly idea. – mechanical_meat Mar 13 '09 at 05:01
  • Okay, but then wouldn't you need the coalescer process to be a central dispatcher that gave each child process a new shared stderr pipe? That would mean that people couldn't use the libraries traditionally, but would have to hand a callback over to the coalescer/dispatcher. – cdleary Mar 13 '09 at 05:06
  • And by "shared stderr pipe" I don't mean shared among child processes, but shared between the coalescer and child process, as you're describing. – cdleary Mar 13 '09 at 05:09
  • Do you have control *between* forks? If so you just `dup` new per-child fd's over stderr (2) just before forking a new child; the child's stderr (2) output will automatically be picked up by the spawner process' coalescer through the corresponding per-child fd. – vladr Mar 13 '09 at 05:13
  • 1
    Incidentally, big assumption here: not Windows. Are you on Windows? – vladr Mar 13 '09 at 05:15
  • I'm using POSIX (but os.dup is on both platforms). I don't see how you can get around the fact you need the `select` in a centralized event loop, which would presumably be where the coalescer lives. Am I missing something? – cdleary Mar 13 '09 at 05:26
  • So anyway, if you are on *nix (i.e. multiprocess is using fork) you can dup a new fd over stderr (2) in Process.start (just before multiprocess calls self._popen = Popen(self), where Popen will do the actual fork) - check out the source code in lib/process.py, lib/forking.py – vladr Mar 13 '09 at 05:28
  • Using Pool? If so then you'd have to use the async variants of map or apply and do the select loop until you get all results. Or spawn a thread (har-har) to do the select polling. :) – vladr Mar 13 '09 at 05:32
  • Wow... that's crazy but would work. Spawn a coalescer thread so that it shares the main process' stderr locks, (hide the real sys.stderr in the coalescer, give sys a fake one for the coalescer to select on), have the coalescer terminate after join on subprocesses, and join on the coalescer `atexit`. – cdleary Mar 13 '09 at 05:38
  • Yes it will. :) Used this approach a while ago (but in perl, not python) to coalesce real-time log output from multiple remote ssh sessions. have fun! – vladr Mar 13 '09 at 06:49
  • 3
    @BrandonRhodes - As I said, *non-intrusively*. Using `multiprocessing.Queue` will not be simpler if there is a lot of code to rewire to use `multiprocessing.Queue`, and/or if [performance is an issue](http://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue) – vladr Oct 09 '13 at 17:07
  • @vladr The benchmark is inconclusive in this scenario. You would have to compare *multiple* pipes against *one* queue. – schlamar Oct 16 '13 at 06:33
  • @vladr Plus, your are blindly assuming that everyone wants a log stream. But what if you want to use advanced handlers (SocketHandler, Sentry, ...) and still collect the entries in main process before further processing? And you would have to configure loggers (in every subprocess) to send the entries to a stream so how exactly do you define *non-intrusively*? – schlamar Oct 16 '13 at 07:18
  • 1
    @schlamar you may want to re-read the OP before commenting; I do not *assume* a log stream, rather the OP *clearly states* that the legacy code *already* writes to a stream (`stderr`) and that he still expects aggregate logging to go to a stream, albeit with some degree of line-level atomicity (non-garbled). Do you now see why this method is non-intrusive for the OP? As for the benchmark, the *number* of pipes is not relevant; the gains come from *buffering* reducing the actual number of system calls (and impact on *client* performance) in exchange for extra latency in the aggregator proc. – vladr Oct 16 '13 at 19:31
52

QueueHandler is native in Python 3.2+, and does exactly this. It is easily replicated in previous versions.

Python docs have two complete examples: Logging to a single file from multiple processes

Each process (including the parent process) puts its logging on the Queue, and then a listener thread or process (one example is provided for each) picks those up and writes them all to a file - no risk of corruption or garbling.

For those using Python < 3.2, import logutils (which is the same as the python 3.2 native code).

fantabolous
  • 21,470
  • 7
  • 54
  • 51
  • 3
    this should be the accepted answer at least from the advent of `QueueHandler`. It is not intrusive, transparent, and works regardless what logger configuration(s) the main process is using. The workers are always logging to the their configured `QueueHandler` Also does not expect any kind of logger configuration from the parent to the spawn child process – g.pickardou Jun 02 '22 at 05:32
37

Below is another solution with a focus on simplicity for anyone else (like me) who get here from Google. Logging should be easy! Only for 3.2 or higher.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
user2133814
  • 2,431
  • 1
  • 24
  • 34
  • 2
    The `QueueHandler` and `QueueListener` classes can be used on Python 2.7 as well, available in the [`logutils`](https://pythonhosted.org/logutils/queue.html) package. – Lev Levitsky Jan 13 '18 at 21:43
  • 7
    The logger of the main process should also use a QueueHandler. In your current code, the main process is bypassing the queue so there can be race conditions between the main process and workers ones. Everyone should log to the queue (via a QueueHandler) and only the QueueListener should be allowed to log to the StreamHandler. – Ismael EL ATIFI Mar 05 '18 at 16:20
  • 1
    Also, you don't have to initial the logger in each child. Just initial the logger in the parent process, and get the logger in each child process. – petertc Jan 14 '20 at 06:22
25

As of 2020 it seems there is a simpler way of logging with multiprocessing.

This function will create the logger. You can set the format here and where you want your output to go (file, stdout):

def create_logger():
    import multiprocessing, logging
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    formatter = logging.Formatter(\
        '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
    handler = logging.FileHandler('logs/your_file_name.log')
    handler.setFormatter(formatter)

    # this bit will make sure you won't have 
    # duplicated messages in the output
    if not len(logger.handlers): 
        logger.addHandler(handler)
    return logger

In the init you instantiate the logger:

if __name__ == '__main__': 
    from multiprocessing import Pool
    logger = create_logger()
    logger.info('Starting pooling')
    p = Pool()
    # rest of the code

Now, you only need to add this reference in each function where you need logging:

logger = create_logger()

And output messages:

logger.info(f'My message from {something}')

Hope this helps.

Iopheam
  • 1,065
  • 10
  • 11
  • 2
    This seems like the most straightforward solution now. Note that the "if not len(logger.handlers)" part is assuming you will use a single handler. If you want to have more than one handler to, e.g., send all messages to a file but only INFO and above to stdout, then you'll need to adjust that part. – Colin Nov 01 '20 at 00:15
  • 2
    Normally you have vast amounts of code that just does _import logging_ and then uses things like 'logging.info("whatever")" - there's no place you can pass a logger object to anything, and there's no chance you can retrofit that code. – James Moore Feb 01 '21 at 21:16
  • 2
    This works but it's not very flexible. For example, once you put create_logger() into all your functions, there's no way to turn off logging in case someone else wants to use your library with their own application. Best practice for libraries is to never force anyone to see the log messages. – medley56 Oct 21 '21 at 21:19
  • @JamesMoore I haven't tried using `logging.info(..)` with multiprocessing. If this works, I'm happy to update the answer. – Iopheam Oct 22 '21 at 13:03
  • 1
    @medley56 Agree. Perhaps, this code isn't intended to work in a library. I used it when I needed to debug a multiprocessing scrapper for an ad-hoc task. – Iopheam Oct 22 '21 at 13:05
  • 1
    In order to avoid inserting `create_logger()` in every function, you can set `globals()['logger'] = multiprocessing.get_logger()` once inside the subprocess, assuming your functions use `logger.info(...)` – Jean Monet Jul 28 '22 at 18:23
22

Yet another alternative might be the various non-file-based logging handlers in the logging package:

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(and others)

This way, you could easily have a logging daemon somewhere that you could write to safely and would handle the results correctly. (E.g., a simple socket server that just unpickles the message and emits it to its own rotating file handler.)

The SyslogHandler would take care of this for you, too. Of course, you could use your own instance of syslog, not the system one.

Zearin
  • 1,474
  • 2
  • 17
  • 36
Ali Afshar
  • 40,967
  • 12
  • 95
  • 109
14

A variant of the others that keeps the logging and queue thread separate.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
ironhacker
  • 141
  • 1
  • 2
  • I like an idea of fetching logger name from queue record. This allows to use conventional `fileConfig()` in MainProcess and a barely configured logger in PoolWorkers (with only `setLevel(logging.NOTSET)`). As I mentioned in another comment, I'm using Pool so I had to obtain my Queue (proxy) from Manager instead of multiprocessing so it can be pickled. This allows me to pass queue to a worker inside of a dictionary (most of which is derived from argsparse object using `vars()`). I feel like in the end this is the best approach for MS Windows that lacks fork() and breaks @zzzeak solution. – mlt Oct 02 '13 at 22:01
  • @mlt I think you could also put a multiprocessing Queue in the init instead of using a Manager (see answer to http://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes - it's about Locks but I believe it works for Queues as well) – fantabolous Aug 18 '15 at 05:08
  • @fantabolous That won't work on MS Windows or any other platform that lacks `fork`. That way each process will have its own independent useless queue. The second approach in the linked Q/A won't work on such platforms. It is a way to non-portable code. – mlt Aug 26 '15 at 20:42
  • @mlt Interesting. I'm using Windows and it seems to work ok for me - not long after I last commented I set up a pool of processes sharing a `multiprocessing.Queue` with the main process and I've been using it constantly since. Won't claim to understand why it works though. – fantabolous Aug 27 '15 at 01:13
11

All current solutions are too coupled to the logging configuration by using a handler. My solution has the following architecture and features:

  • You can use any logging configuration you want
  • Logging is done in a daemon thread
  • Safe shutdown of the daemon by using a context manager
  • Communication to the logging thread is done by multiprocessing.Queue
  • In subprocesses, logging.Logger (and already defined instances) are patched to send all records to the queue
  • New: format traceback and message before sending to queue to prevent pickling errors

Code with usage example and output can be found at the following Gist: https://gist.github.com/schlamar/7003737

schlamar
  • 9,238
  • 3
  • 38
  • 76
  • Unless I'm missing something, this isn't actually a daemon thread, since you never set `daemon_thread.daemon` to `True`. I needed to do that in order to get my Python program to exit properly when an exception occurs within the context manager. – blah238 Jun 30 '16 at 21:22
  • I also needed to catch, log and swallow exceptions thrown by the target `func` in `logged_call`, otherwise the exception would get garbled with other logged output. Here's my modified version of this: https://gist.github.com/blah238/8ab79c4fe9cdb254f5c37abfc5dc85bf – blah238 Jun 30 '16 at 22:27
  • @blah238 If you set @schlamar 's `daemon` (let's call it the QueueListener, for a better naming) as an actual daemon thread, you risk that being abruptly stopped when the main program exits. Imagine the queue has buffered quite a lot of messages, the main program comes to the end, exits the context manager, the `None` sentinel is added on top of the full queue, and then the main process terminates before the listener (`daemon`) is able to dequeue and handle all log messages. You would lose those messages. How are you handling this situation in your code? – Michele Piccolini Sep 16 '20 at 15:02
10

Since we can represent multiprocess logging as many publishers and one subscriber (listener), using ZeroMQ to implement PUB-SUB messaging is indeed an option.

Moreover, PyZMQ module, the Python bindings for ZMQ, implements PUBHandler, which is object for publishing logging messages over a zmq.PUB socket.

There's a solution on the web, for centralized logging from distributed application using PyZMQ and PUBHandler, which can be easily adopted for working locally with multiple publishing processes.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
Samuel
  • 3,631
  • 5
  • 37
  • 71
  • A stdlib version using sockets is provided here: https://docs.python.org/3/howto/logging-cookbook.html#sending-and-receiving-logging-events-across-a-network, though it may need some slight adjustments as per use-case – Jean Monet Jul 28 '22 at 18:29
6

I also like zzzeek's answer but Andre is correct that a queue is required to prevent garbling. I had some luck with the pipe, but did see garbling which is somewhat expected. Implementing it turned out to be harder than I thought, particularly due to running on Windows, where there are some additional restrictions about global variables and stuff (see: How's Python Multiprocessing Implemented on Windows?)

But, I finally got it working. This example probably isn't perfect, so comments and suggestions are welcome. It also does not support setting the formatter or anything other than the root logger. Basically, you have to reinit the logger in each of the pool processes with the queue and set up the other attributes on the logger.

Again, any suggestions on how to make the code better are welcome. I certainly don't know all the Python tricks yet :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
Community
  • 1
  • 1
Mike Miller
  • 179
  • 2
  • 10
  • 1
    I wonder if `if 'MainProcess' == multiprocessing.current_process().name:` can be used in place of passing `child`? – mlt Oct 02 '13 at 00:30
  • In case someone else is trying to use process pool instead of separate process objects on Windows, it worth mentioning that [Manager](http://stackoverflow.com/questions/7865430/multiprocessing-pool-picklingerror-cant-pickle-type-thread-lock-attribu#comment9595439_7865512) shall be used to pass queue to subprocesses as it is not picklable directly. – mlt Oct 02 '13 at 06:15
  • This implementation worked well for me. I modified it to work with an arbitrary number of handlers. This way you can configure you root handler in a non-multiprocessing fashion, then where it is safe to make the queue, pass the root handlers to this, delete them, and make this the only handler. – Jaxor24 Apr 27 '15 at 23:51
4

I'd like to suggest to use the logger_tt library: https://github.com/Dragon2fly/logger_tt

The multiporcessing_logging library is not working on my macOSX, while logger_tt does.

rudaoshi
  • 53
  • 5
  • I don't know why there is a disagree with my answer. logger_tt library is certainly most friendly logging library for multiprocessing. – rudaoshi Mar 08 '21 at 11:19
4

The concurrent-log-handler seems to do the job perfectly. Tested on Windows. Supports also POSIX systems.

Main idea

  • Create a separate file with a function that returns a logger. The logger must have fresh instance of ConcurrentRotatingFileHandler for each process. Example function get_logger() given below.
  • Creating loggers is done at the initialization of the process. For a multiprocessing.Process subclass it would mean the beginning of the run() method.

Detailed instructions

I this example, I will use the following file structure

.
│-- child.py        <-- For a child process
│-- logs.py         <-- For setting up the logs for the app
│-- main.py         <-- For a main process
│-- myapp.py        <-- For starting the app
│-- somemodule.py   <-- For an example, a "3rd party module using standard logging"

Code

Child process

# child.py 

import multiprocessing as mp
import time
from somemodule import do_something


class ChildProcess(mp.Process):
    def __init__(self):
        self.logger = None
        super().__init__()

    def run(self):
        from logs import get_logger
        self.logger = get_logger()


        while True:
            time.sleep(1)
            self.logger.info("Child process")
            do_something()

  • Simple child process that inherits multiprocessing.Process and simply logs to file text "Child process"
  • Important: The get_logger() is called inside the run(), or elsewhere inside the child process (not module level or in __init__().) This is required as get_logger() creates ConcurrentRotatingFileHandler instance, and new instance is needed for each process.
  • The do_something is used just to demonstrate that this works with 3rd party library code which does not have any clue that you are using concurrent-log-handler.

Main Process

# main.py

import logging
import multiprocessing as mp
import time

from child import ChildProcess
from somemodule import do_something


class MainProcess(mp.Process):
    def __init__(self):
        self.logger = logging.getLogger()
        super().__init__()

    def run(self):
        from logs import get_logger

        self.logger = get_logger()
        self.child = ChildProcess()
        self.child.daemon = True
        self.child.start()

        while True:
            time.sleep(0.5)
            self.logger.critical("Main process")
            do_something()


  • The main process that logs into file two times a second "Main process". Also inheriting from multiprocessing.Process.
  • Same comments for get_logger() and do_something() apply as for the child process.

Logger setup

# logs.py

import logging
import os

from concurrent_log_handler import ConcurrentRotatingFileHandler

LOGLEVEL = logging.DEBUG


def get_logger():
    logger = logging.getLogger()

    if logger.handlers:
        return logger

    # Use an absolute path to prevent file rotation trouble.
    logfile = os.path.abspath("mylog.log")

    logger.setLevel(LOGLEVEL)

    # Rotate log after reaching 512K, keep 5 old copies.
    filehandler = ConcurrentRotatingFileHandler(
        logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
    )
    filehandler.setLevel(LOGLEVEL)

    # create also handler for displaying output in the stdout
    ch = logging.StreamHandler()
    ch.setLevel(LOGLEVEL)

    formatter = logging.Formatter(
        "%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
    )

    # add formatter to ch
    ch.setFormatter(formatter)
    filehandler.setFormatter(formatter)

    logger.addHandler(ch)
    logger.addHandler(filehandler)

    return logger
  • This uses the ConcurrentRotatingFileHandler from the concurrent-log-handler package. Each process needs a fresh ConcurrentRotatingFileHandler instance.
  • Note that all the arguments for the ConcurrentRotatingFileHandler should be the same in every process.

Example app

# myapp.py 

if __name__ == "__main__":
    from main import MainProcess

    p = MainProcess()
    p.start()
  • Just a simple example on how to start the multiprocess application

Example of 3rd party module using standard logging

# somemodule.py 

import logging

logger = logging.getLogger("somemodule")

def do_something():
    logging.info("doing something")

  • Just a simple example to test if loggers from 3rd party code will work normally.

Example output

2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]

Niko Föhr
  • 28,336
  • 10
  • 93
  • 96
3

just publish somewhere your instance of the logger. that way, the other modules and clients can use your API to get the logger without having to import multiprocessing.

Javier
  • 60,510
  • 8
  • 78
  • 126
  • 1
    The problem with this is that the multiprocessing loggers appear unnamed, so you won't be able to decipher the message stream easily. Maybe it would be possible to name them after creation, which would make it more reasonable to look at. – cdleary Mar 13 '09 at 04:45
  • well, publish one logger for each module, or better, export diferent closures that use the logger with the module name. the point is to let other modules use your API – Javier Mar 13 '09 at 04:48
  • Definitely reasonable (and +1 from me!), but I would miss being able to just `import logging; logging.basicConfig(level=logging.DEBUG); logging.debug('spam!')` from anywhere and have it work properly. – cdleary Mar 13 '09 at 05:07
  • 3
    It's an interesting phenomenon that I see when I use Python, that we get so used to being able to do what we want in 1 or 2 simple lines that the simple and logical approach in other languages (eg. to publish the multiprocessing logger or wrap it in an accessor) still feels like a burden. :) – Kylotan Mar 13 '09 at 12:00
3

I liked zzzeek's answer. I would just substitute the Pipe for a Queue since if multiple threads/processes use the same pipe end to generate log messages they will get garbled.

André Cruz
  • 500
  • 1
  • 5
  • 15
  • I was having some issues with the handler, though it wasnt that messages were garbled, its just the whole thing would stop working. I changed Pipe to be Queue since that is more appropriate. However the errors I was getting weren't resolved by that - ultimately I added a try/except to the receive() method - very rarely, an attempt to log exceptions will fail and wind up being caught there. Once I added the try/except, it runs for weeks with no problem, and a standarderr file will grab about two errant exceptions per week. – zzzeek Nov 24 '09 at 16:51
2

How about delegating all the logging to another process that reads all log entries from a Queue?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Simply share LOG_QUEUE via any of the multiprocess mechanisms or even inheritance and it all works out fine!

Sawan
  • 23
  • 3
2

Below is a class that can be used in Windows environment, requires ActivePython. You can also inherit for other logging handlers (StreamHandler etc.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

And here is an example that demonstrates usage:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
  • Probably using `multiprocessing.Lock()` instead of Windows Mutex would make the solution portable. – xmedeko Sep 25 '16 at 19:28
1

If you have deadlocks occurring in a combination of locks, threads and forks in the logging module, that is reported in bug report 6721 (see also related SO question).

There is a small fixup solution posted here.

However, that will just fix any potential deadlocks in logging. That will not fix that things are maybe garbled up. See the other answers presented here.

Community
  • 1
  • 1
Albert
  • 65,406
  • 61
  • 242
  • 386
1

I have a solution that's similar to ironhacker's except that I use logging.exception in some of my code and found that I needed to format the exception before passing it back over the Queue since tracebacks aren't pickle'able:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
Richard Jones
  • 400
  • 1
  • 5
1

Here's my simple hack/workaround... not the most comprehensive, but easily modifiable and simpler to read and understand I think than any other answers I found before writing this:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
nmz787
  • 1,960
  • 1
  • 21
  • 35
1

There is this great package

Package: https://pypi.python.org/pypi/multiprocessing-logging/

code: https://github.com/jruere/multiprocessing-logging

Install:

pip install multiprocessing-logging

Then add:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
juan Isaza
  • 3,646
  • 3
  • 31
  • 37
  • 4
    This library is literally based off of another comment on the current SO post: https://stackoverflow.com/a/894284/1698058. – Chris Hunt Jan 09 '19 at 03:30
  • Origins: https://stackoverflow.com/a/894284/1663382 I appreciate the example usage of the module, in addition to the documentation on the homepage. – Liquidgenius Feb 21 '20 at 08:14
  • This module does not work if the multiprocessing context is `spawn` (default on >3.8 with MacOS) – David Segonds Apr 26 '21 at 22:25
1

For whoever might need this, I wrote a decorator for multiprocessing_logging package that adds the current process name to logs, so it becomes clear who logs what.

It also runs install_mp_handler() so it becomes unuseful to run it before creating a pool.

This allows me to see which worker creates which logs messages.

Here's the blueprint with an example:

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')
Orsiris de Jong
  • 2,819
  • 1
  • 26
  • 48
0

Simplest idea as mentioned:

  • Grab the filename and the process id of the current process.
  • Set up a [WatchedFileHandler][1]. The reasons for this handler are discussed in detail here, but in short there are certain worse race conditions with the other logging handlers. This one has the shortest window for the race condition.
    • Choose a path to save the logs to such as /var/log/...
user1460675
  • 407
  • 4
  • 9
0

One of the alternatives is to write the mutliprocessing logging to a known file and register an atexit handler to join on those processes read it back on stderr; however, you won't get a real-time flow to the output messages on stderr that way.

cdleary
  • 69,512
  • 53
  • 163
  • 191
  • is the approach you are proposing below identical to the one from your comment here http://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python#comment455047_641488 – iruvar Aug 06 '12 at 17:37