1

I need to use logging in a multiprocess application. As explained in documentation one method is to use the couple QueueHandler/QueueListener but I stumbled in deadlock problem. So, I started from the cookbook example and I discovered that this problem manifests if the logger thread starts before sub-processes and I can't explain why. Isn't also a better thing start the queue consumer before the producer?

Here is the code of cookbook show witch row I moved:

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'DEBUG',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
                'level': 'DEBUG'
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'DEBUG',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))

    lp.start() # I need start logger thread before subprocess
    
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    
    #lp.start() <-- but cookbook example starts logger thread here

    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

I tried removing the Logging part simplifying all to a thread that writes a file with data from a queue feeded by sub-processes and the deadlock disappear, so I think the problem isn't the in the queue but in the Logging library

from multiprocessing import Process, Queue
import threading
import time

def queue_listener_thread(q):
    with open('test.log', 'w') as f:
        while True:
            record = q.get()
            if record is None:
                break
            #simulate do something
            time.sleep(0.001)
            #finally write to file
            f.write(f'{str(record)}\n')


def worker_process(q):
    #simulate do something
    time.sleep(0.1)
    for i in range(100):
        #simulate do something
        time.sleep(0.001)
        q.put_nowait(i)



if __name__ == '__main__':
    q = Queue()
    lp = threading.Thread(target=queue_listener_thread, args=(q,))
    lp.start()
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...

    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

In my application the sub-processes starts after logging, terminate and other sub-processes can starts after, I think It's common in real world application

I'm using Python 3.10.5 on Fedora Linux 35

Edit:

As here https://codewithoutrules.com/2017/08/16/concurrency-python/ I used gdb to undestand what happen, here last rows executed:

(gdb) py-bt
Traceback (most recent call first):
  File "/usr/lib64/python3.10/logging/__init__.py", line 1084, in flush
    self.stream.flush()
  File "/usr/lib64/python3.10/logging/__init__.py", line 1104, in emit
    self.flush()
  File "/usr/lib64/python3.10/logging/__init__.py", line 1218, in emit
    StreamHandler.emit(self, record)

I was wrongly imputing the problem to file writing beacause it's the slowest thing in code, but the deadlock is in StreamHandler, so the bad thing is in "screen output". So I rewrote the "simplified code" with a simple print instead of a file write and... again deadlock! But why? Can be a python bug?

from multiprocessing import Process, Queue
import threading

def queue_listener_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        #print to screen instead of writing to file
        print(f'{str(record)}')


def worker_process(q):
    for i in range(100):
        q.put_nowait(i)



if __name__ == '__main__':
    q = Queue()
    lp = threading.Thread(target=queue_listener_thread, args=(q,))
    lp.start()
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...

    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()
MegaMax
  • 11
  • 2
  • Logging itself contains threads (or at least locks), as well as you're creating your own threads. I haven't read through your entire program, but it is extremely common practice (across all languages and platforms that use fork) to fork first then spawn threads. This ensures things like locks are copied in the correct state. The deadlock scenario is that a thread is holding a lock when the fork occurs, and the fork will copy the lock in the locked state, but not copy the thread (fork doesn't copy threads typically). so no thread exists anymore to release the lock – Aaron Jul 03 '22 at 17:58
  • if I understand correctly is a bad practice fork a process with running threads so the only way I can think of it's running the logger in a separate process and leave "clean" the main process – MegaMax Jul 03 '22 at 20:02
  • You can make sure to start your processes first before trying to log anything, which should prevent most of the issues. Also using "spawn" or "forkserver" kinda eliminates the problem by just not forking the main process (sacrifice a little speed and the convienience of copying the memory space for a cleaner process space). – Aaron Jul 03 '22 at 20:05
  • as writed in the question i need start some subprocesses after the logging and I must find a solution to do it. I had seen that post before but meaning had escaped me, now with your explaination the meaning is more evident to me. Now I can try new ways to resolve the problem and I'll update here the results, a thousand thanks – MegaMax Jul 03 '22 at 20:48
  • I tried both setting spawn method in multiprocessing module and putting the logger another process before fork the subprocesses and deadlocks disappear! Thanks to @Aaron suggestions that unlocked me I understood that's is a common problem for who "unconsciously" mix multiprocessing with threads using fork method; – MegaMax Jul 05 '22 at 13:36
  • there are many articles about this like these: https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn/ https://pythonspeed.com/articles/python-multiprocessing/ There is also a ticket opened since 2009 (https://bugs.python.org/issue6721) but only a little notice in python documentation https://docs.python.org/3/library/multiprocessing.html saying " Note that safely forking a multithreaded process is problematic." IMHO it would take some billboards to warn!!! :) I hope this post will be useful to others! – MegaMax Jul 05 '22 at 13:36

0 Answers0