I need to use logging in a multiprocess application. As explained in documentation one method is to use the couple QueueHandler/QueueListener but I stumbled in deadlock problem. So, I started from the cookbook example and I discovered that this problem manifests if the logger thread starts before sub-processes and I can't explain why. Isn't also a better thing start the queue consumer before the producer?
Here is the code of cookbook show witch row I moved:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'DEBUG'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'DEBUG',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start() # I need start logger thread before subprocess
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
#lp.start() <-- but cookbook example starts logger thread here
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
I tried removing the Logging part simplifying all to a thread that writes a file with data from a queue feeded by sub-processes and the deadlock disappear, so I think the problem isn't the in the queue but in the Logging library
from multiprocessing import Process, Queue
import threading
import time
def queue_listener_thread(q):
with open('test.log', 'w') as f:
while True:
record = q.get()
if record is None:
break
#simulate do something
time.sleep(0.001)
#finally write to file
f.write(f'{str(record)}\n')
def worker_process(q):
#simulate do something
time.sleep(0.1)
for i in range(100):
#simulate do something
time.sleep(0.001)
q.put_nowait(i)
if __name__ == '__main__':
q = Queue()
lp = threading.Thread(target=queue_listener_thread, args=(q,))
lp.start()
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
In my application the sub-processes starts after logging, terminate and other sub-processes can starts after, I think It's common in real world application
I'm using Python 3.10.5 on Fedora Linux 35
Edit:
As here https://codewithoutrules.com/2017/08/16/concurrency-python/ I used gdb to undestand what happen, here last rows executed:
(gdb) py-bt
Traceback (most recent call first):
File "/usr/lib64/python3.10/logging/__init__.py", line 1084, in flush
self.stream.flush()
File "/usr/lib64/python3.10/logging/__init__.py", line 1104, in emit
self.flush()
File "/usr/lib64/python3.10/logging/__init__.py", line 1218, in emit
StreamHandler.emit(self, record)
I was wrongly imputing the problem to file writing beacause it's the slowest thing in code, but the deadlock is in StreamHandler, so the bad thing is in "screen output". So I rewrote the "simplified code" with a simple print instead of a file write and... again deadlock! But why? Can be a python bug?
from multiprocessing import Process, Queue
import threading
def queue_listener_thread(q):
while True:
record = q.get()
if record is None:
break
#print to screen instead of writing to file
print(f'{str(record)}')
def worker_process(q):
for i in range(100):
q.put_nowait(i)
if __name__ == '__main__':
q = Queue()
lp = threading.Thread(target=queue_listener_thread, args=(q,))
lp.start()
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()