2

I can't get logging to a single file working with multprocess.Pool.apply_async. I'm trying to adapt this example from the Logging Cookbook, but it only works for multiprocessing.Process. Passing the logging queue into apply_async doesn't seem to have effect. I would like to use a Pool so that I can easily manage the number of simultaneous threads.

The following adapted example with multiprocessing.Process works ok for me, except I am not getting log messages from the main process, and I don't think it will work well when I have 100 large jobs.

import logging
import logging.handlers
import numpy as np
import time
import multiprocessing
import pandas as pd
log_file = 'PATH_TO_FILE/log_file.log'

def listener_configurer():
    root = logging.getLogger()
    h = logging.FileHandler(log_file)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)


def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)


# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_function(sleep_time, name, queue, configurer):
    configurer(queue)
    start_message = 'Worker {} started and will now sleep for {}s'.format(name, sleep_time)
    logging.info(start_message)
    time.sleep(sleep_time)
    success_message = 'Worker {} has finished sleeping for {}s'.format(name, sleep_time)
    logging.info(success_message)

def main_with_process():
    start_time = time.time()
    single_thread_time = 0.
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        name = str(i)
        sleep_time = np.random.randint(10) / 2
        single_thread_time += sleep_time
        worker = multiprocessing.Process(target=worker_function,
                                         args=(sleep_time, name, queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()
    end_time = time.time()
    final_message = "Script execution time was {}s, but single-thread time was {}s".format(
        (end_time - start_time),
        single_thread_time
    )
    print(final_message)

if __name__ == "__main__":
    main_with_process()

But I can't get the following adaptation to work:

def main_with_pool():
    start_time = time.time()
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    pool = multiprocessing.Pool(processes=3)
    job_list = [np.random.randint(10) / 2 for i in range(10)]
    single_thread_time = np.sum(job_list)
    for i, sleep_time in enumerate(job_list):
        name = str(i)
        pool.apply_async(worker_function,
                         args=(sleep_time, name, queue, worker_configurer))

    queue.put_nowait(None)
    listener.join()
    end_time = time.time()
    print("Script execution time was {}s, but single-thread time was {}s".format(
        (end_time - start_time),
        single_thread_time
    ))

if __name__ == "__main__":
    main_with_pool()

I've tried many slight variations, using multiprocessing.Manager, multiprocessing.Queue, multiprocessing.get_logger, apply_async.get(), but haven't gotten any to work.

I would think there would be an off-the-shelf solution for this. Should I try Celery instead?

thanks

GrayOnGray
  • 315
  • 3
  • 12
  • I solved this problem by following torek's advice. I have a working [example](https://github.com/ClayCampaigne/multiprocessing-pool-logging/blob/master/pool_logging.py) on github. – GrayOnGray Jan 01 '18 at 22:23

3 Answers3

2

There are actually two separate problems here, which are intertwined:

  • You cannot pass a multiprocessing.Queue() object as an argument to a Pool-based function (you can pass it to the worker you start directly, but not any "further in" as it were).
  • You must wait for all the asynchronous workers to complete before you send the None through to your listener process.

To fix the first one, replace:

queue = multiprocessing.Queue(-1)

with:

queue = multiprocessing.Manager().Queue(-1)

as a manager-managed Queue() instance can be passed through.

To fix the second, either collect each result from each asynchronous call, or close the pool and wait for it, e.g.:

pool.close()
pool.join()
queue.put_nowait(None)

or the more complex:

getters = []
for i, sleep_time in enumerate(job_list):
    name = str(i)
    getters.append(
        pool.apply_async(worker_function,
                     args=(sleep_time, name, queue, worker_configurer))
    )
while len(getters):
    getters.pop().get()
# optionally, close and join pool here (generally a good idea anyway)
queue.put_nowait(None)

(You should also consider replacing your put_nowait with a waiting version of put and not using unlimited length queues.)

torek
  • 448,244
  • 59
  • 642
  • 775
  • Thanks! I have a remaining problem, which is that every time a worker picks up a task, the number of indentical messages it sends for each logging event increments by one. I tried adding a `if not len(root.handlers):` condition before adding a handler in the `worker_configurer` function, but it didn't do anything. I am now consulting this question, [Logging with multiprocessing madness](https://stackoverflow.com/questions/20332359/logging-with-multiprocessing-madness). (I realize some of my functions are misnamed because I am adapting code from a `Process` setting.) – GrayOnGray Jan 01 '18 at 16:03
  • I solved the remaining problem I just mentioned by initializing the pool with the keyword argument `maxtasksperchild=1` so that each worker is cleaned up after a single task and each task spawns a new worker. Despite the similarity of the problem, I found nothing I thought worth using in the question [Logging with multiprocessing madness](https://stackoverflow.com/questions/20332359/logging-with-multiprocessing-madness) and its answers. – GrayOnGray Jan 01 '18 at 17:40
  • 1
    It's fundamentally a bit tricky, especially if you want portability, since each process is independent of its original parent, but on non-Windows systems each process starts with the state its original parent, including any logger configuration. The logging module is designed to work as a sort of singleton for the root logging instance; meanwhile, handlers are stackable and hence not-singleton-ish. So you end up with a crazy-quilt set of possibilities for each worker. sayan's answer—doing logging from a single point—is a lot easier to keep straight in your head. – torek Jan 01 '18 at 17:46
  • Unfortunately portability between Windows and non-Windows is a pretty high priority for me. – GrayOnGray Jan 01 '18 at 17:57
1

Consider using two queues. The first queue is where you put the data for the workers. Each worker after job completion pushes the results to the second queue. Now consume this second queue to write the log to the file.

sayan
  • 1,520
  • 17
  • 33
  • This seems easier to understand and manage -- I'm not sure I'll ever understand the interaction of logging and multiprocessing -- but also potentially limiting because ultimately I'm going to be parallelizing a pretty long multi-part algorithm that I have created for another use, and I will want to preserve the logging that is specific to the various parts of that algorithm. – GrayOnGray Jan 01 '18 at 18:02
0

[ADDENDUM] Regarding maxtasksperchild=1
you don't really need it. The reason for repeated messages were due to: you were repeatedly adding queuehandlers to the root logger of a child process. The following code checks if any handlers exist before adding another:

def worker_configurer(queue):
    root = logging.getLogger()
    # print(f'{root.handlers=}')
    if len(root.handlers) == 0:
        h = logging.handlers.QueueHandler(queue)   
        root.addHandler(h)
        root.setLevel(logging.DEBUG)
eliu
  • 2,390
  • 1
  • 17
  • 29