4

I'm trying to integrate structlog in an application which uses multiprocessing. While i plan to switch to structlog completely, I want to capture stdlib logging calls of third partly libraries too. Since we plan to emit a JSON logfile and a Key-Value output, I think the correct way is to delegate the actual formatting to logging:

    logging   \
               structlog processors (worker) -> logging queuehandler -> logging handler (main process) -> stdlib logging output
    structlog /

As per https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes we use a QueueHandler to send all log messages to the main process for writing.

This is my current configuration:

import logging.handlers
import multiprocessing
import sys
import traceback

import structlog
from structlog.processors import JSONRenderer, KeyValueRenderer


def proc_info(logger, method, event_dict):
    event_dict['worker-process'] = multiprocessing.current_process().name
    return event_dict

pre_chain = [
    structlog.stdlib.add_log_level,
    proc_info
]

def configure_root_logging():
    logging.config.dictConfig({
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "plain": {
                    "()": structlog.stdlib.ProcessorFormatter,
                    "processor": KeyValueRenderer(),
                    "foreign_pre_chain": pre_chain,
                },
                "json": {
                    "()": structlog.stdlib.ProcessorFormatter,
                    "processor": JSONRenderer(),
                    "foreign_pre_chain": pre_chain,
                },
            },
            "handlers": {
                "console_kvp": {
                    "level": "DEBUG",
                    "class": "logging.StreamHandler",
                    "formatter": "json",
                },
                "console_json": {
                    "level": "DEBUG",
                    "class": "logging.StreamHandler",
                    "formatter": "plain",
                },
            },
            "loggers": {
                "": {
                    "handlers": ["console_kvp", "console_json"],
                    "level": "DEBUG",
                    "propagate": True,
                },
            }
    })


def configure_structlog():
    structlog.configure(
        processors=pre_chain + [structlog.stdlib.ProcessorFormatter.wrap_for_formatter],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )


def setup_worker_logging(logging_queue):
    root = logging.getLogger()
    queue_handler = logging.handlers.QueueHandler(logging_queue)
    root.addHandler(queue_handler)
    root.setLevel(logging.INFO)


def worker_entrypoint(logging_queue):
    setup_worker_logging(logging_queue)
    configure_structlog()

    logger = logging.getLogger(__name__)
    logger.warning("worker_startup_std_logging")

    slogger = structlog.get_logger()
    slogger.warning("worker_startup_structlog")


def handle_logging_queue_messages(queue):
    while True:
        try:
            record = queue.get()
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except KeyboardInterrupt:
            break
        except Exception:
            print('LOGGING FAILED!!! ', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)


def main():
    logging_queue = multiprocessing.Queue()

    t = multiprocessing.Process(
        target=worker_entrypoint, args=[logging_queue], daemon=True,
        name='worker-9999')
    t.start()

    configure_root_logging()
    configure_structlog()

    handle_logging_queue_messages(logging_queue)

    slogger = structlog.get_logger()
    slogger.warning("Startup")


main()

Note: I've used a StreamHandler for both output formats during debugging but the JSON formatted output will go to a FileHandler eventually, hence the QueueHandler in between.

The output is:

{"event": "worker_startup_std_logging", "level": "warning", "worker-process": "MainProcess"}
event='worker_startup_std_logging' level='warning' worker-process='MainProcess'
{"event": "{'event': 'worker_startup_structlog', 'level': 'warning', 'worker-process': 'worker-9999'}", "level": "warning", "worker-process": "MainProcess"}
event="{'event': 'worker_startup_structlog', 'level': 'warning', 'worker-process': 'worker-9999'}" level='warning' worker-process='MainProcess'

There are multiple issues here:

  1. The worker-process field has the value of the main process, not the value of the worker process
  2. The event field of the structlog logger contains the serialized value of the event_dict which contains the correct value for worker-process

I expected structlog to evaluate the processor chain before handing off to loggings QueueHandler.

Can someone please explain how structlog is supposed to work in this situation?

ercpe
  • 342
  • 4
  • 9

0 Answers0