0

When using sklearn, I want to see the output. Therefore, I use verbose when available.

Generally, I want timestamps, process ids etc, so I use the python logging module when I can. Getting sklearn output to the logging module has been done before, e.g. https://stackoverflow.com/a/50803365

However, I want to run in parallell, and joblib also use sys.stdout and sys.stderr directly. Therefore, my attempt (see below) does not work.

import logging
import sys
import contextlib

class LogAdapter:
    def __init__(self,level,logger) -> None:
        if level == 'INFO':
            self.report = logger.info
        elif level == 'ERROR':
            self.report = logger.error

    def write(self,msg):
        stripped = msg.rstrip()
        if len(stripped) > 0:
            self.report(stripped)

    def flush(self):
        pass

@contextlib.contextmanager
def redirect_to_log(logger):
    originals = sys.stdout, sys.stderr
    sys.stdout = LogAdapter(level='INFO',logger=logger)
    sys.stderr = LogAdapter(level='ERROR',logger=logger)
    yield
    sys.stdout, sys.stderr = originals


def test_case():
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.utils import parallel_backend
    logger = logging.getLogger(__name__)
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(process)d | %(asctime)s | %(name)14s | %(levelname)7s | %(message)s",
    )

    for backend_name in ['loky','threading']:
        logger.info(f"Testing backend {backend_name}")
        with parallel_backend(backend_name),redirect_to_log(logger):
            clf = RandomForestClassifier(2, verbose=4)
            X = [[0, 0], [1, 1]]
            Y = [0, 1]
            clf = clf.fit(X, Y)

if __name__ == "__main__":
    test_case()

I get output

19320 | 2022-11-30 17:49:16,938 |       __main__ |    INFO | Testing backend loky
19320 | 2022-11-30 17:49:16,951 |       __main__ |   ERROR | [Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
building tree 1 of 2
building tree 2 of 2
19320 | 2022-11-30 17:49:18,923 |       __main__ |   ERROR | [Parallel(n_jobs=-1)]: Done   2 out of   2 | elapsed:    1.9s remaining:    0.0s
19320 | 2022-11-30 17:49:18,923 |       __main__ |   ERROR | [Parallel(n_jobs=-1)]: Done   2 out of   2 | elapsed:    1.9s finished
19320 | 2022-11-30 17:49:18,924 |       __main__ |    INFO | Testing backend threading
19320 | 2022-11-30 17:49:18,925 |       __main__ |   ERROR | [Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
19320 | 2022-11-30 17:49:18,932 |       __main__ |    INFO | building tree 1 of 2
19320 | 2022-11-30 17:49:18,932 |       __main__ |    INFO | building tree 2 of 2
19320 | 2022-11-30 17:49:18,934 |       __main__ |   ERROR | [Parallel(n_jobs=-1)]: Done   2 out of   2 | elapsed:    0.0s remaining:    0.0s
19320 | 2022-11-30 17:49:18,934 |       __main__ |   ERROR | [Parallel(n_jobs=-1)]: Done   2 out of   2 | elapsed:    0.0s finished

As you can see, it works nicely with the threading backend, but not with the loky backend. Loky is for multiprocessing, and the context manager of mine only catch stdout and stderr in the main process. How do I capture stdout of child processes and put them into standard python logging?

If it was a plain python subprocess call, I could catch the IO by providing pipes as in https://codereview.stackexchange.com/questions/6567/redirecting-subprocesses-output-stdout-and-stderr-to-the-logging-module

Others have tried and failed before me with loky, I realize. One option is to make sure a "setup logging" call is attached to each job pushed via joblib. That could work, but sklearn does not expose that level of detail, by what I know. See e.g. https://github.com/joblib/joblib/issues/1017

LudvigH
  • 3,662
  • 5
  • 31
  • 49

1 Answers1

0

I did come up with a workaround using the dask backend instead. I define a worker plugin that essentially is my contextmanager

import dask.distributed
class LogPlugin(dask.distributed.WorkerPlugin):
    name = "LoggerRedirector"

    def setup(self, worker: dask.distributed.Worker):
        self.originals = sys.stdout, sys.stderr
        init_logging()
        sys.stdout = LogAdapter(level='INFO',logger=logging.getLogger(__name__))
        sys.stderr = LogAdapter(level='ERROR',logger=logging.getLogger(__name__))

    def teardown(self, worker: dask.distributed.Worker):
        sys.stdout, sys.stderr = self.originals

and then register it in a dask backend

client = dask.distributed.Client()
client.register_worker_plugin(LogPlugin())

I can now get the desired output with multiprocessing.

It is an acceptable solution, but somewhat annoying, as dask has larger overhead than loky, and imposes a new dependency for me.

The new full code is:


import logging
import sys
import contextlib

class LogAdapter:
    def __init__(self,level,logger) -> None:
        if level == 'INFO':
            self.report = logger.info
        elif level == 'ERROR':
            self.report = logger.error

    def write(self,msg):
        stripped = msg.rstrip()
        if len(stripped) > 0:
            self.report(stripped)

    def flush(self):
        pass

@contextlib.contextmanager
def redirect_to_log(logger):
    originals = sys.stdout, sys.stderr
    sys.stdout = LogAdapter(level='INFO',logger=logger)
    sys.stderr = LogAdapter(level='ERROR',logger=logger)
    yield
    sys.stdout, sys.stderr = originals

def init_logging():
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(process)d | %(asctime)s | %(name)14s | %(levelname)7s | %(message)s",
    )

import dask.distributed
class LogPlugin(dask.distributed.WorkerPlugin):
    name = "LoggerRedirector"

    def setup(self, worker: dask.distributed.Worker):
        self.originals = sys.stdout, sys.stderr
        init_logging()
        sys.stdout = LogAdapter(level='INFO',logger=logging.getLogger(__name__))
        sys.stderr = LogAdapter(level='ERROR',logger=logging.getLogger(__name__))

    def teardown(self, worker: dask.distributed.Worker):
        sys.stdout, sys.stderr = self.originals

def test_case():
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.utils import parallel_backend
    client = dask.distributed.Client()
    client.register_worker_plugin(LogPlugin())
    logger = logging.getLogger(__name__)
    init_logging()
    for backend_name in ['loky','threading','dask']:
        logger.info(f"Testing backend {backend_name}")
        with parallel_backend(backend_name),redirect_to_log(logger):
            clf = RandomForestClassifier(2, verbose=4)
            X = [[0, 0], [1, 1]]
            Y = [0, 1]
            clf = clf.fit(X, Y)

if __name__ == "__main__":
    test_case()
LudvigH
  • 3,662
  • 5
  • 31
  • 49
  • I won't accept this as an answer, since it shifts to dask. I post it more as a "this is a maybe useful non-answer that anyone might want...." – LudvigH Nov 30 '22 at 17:34
  • I wrote a big more in my personal blog https://el-hult.github.io/2022/11/30/sklearn-logging.html in a style that might be interesting, but don't fit with the SO format. – LudvigH Feb 23 '23 at 10:52