4

I have a bunch of Python scripts to run some data science models. It takes quite a while and the only way to speed it up is to use multiprocessing. To achieve this, I used the joblib library and it works really well. Unfortunately, however, this messes up logging, and the console output is garbled (expectedly so, however) too, as all processes dump their respective outputs simultaneously.

I am new to using the logging library and followed some other SO answers to try and get it to work. I am using 8 cores for processing. Using the answers on SO, I wrote out to log files, and expected 8 new files every iteration. However, it created 8 files the first iteration, and wrote/appended only to those 8 files every loop. This was a little inconvenient and so I explored a little more and found loguru and logzero. While they both cover examples using multiprocessing, neither of them show how to use it with joblib. Here is what I have so far:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id, logger):

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)


if __name__ == "__main__":
    main()

train_model.py

import math
from datetime import datetime
from itertools import product
from math import sqrt

import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error

import helper
import stock_subscriber_data

# bunch of functions here that don't need logging...

# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
    #... do stuff here ...
    #... and here ...
    logger.info('{0:.3f}'.format(error))
    return error, model


# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
    #... do stuff here ...
    #... and here ...
    logger.info('> Model{0} {1:.3f}'.format(key, result))
    return key, result, best_model



def read_train_write(data_df, stock_id, series, last_date, logger):
    #... do stuff here ...
    #... and here ...
    logger.info('done')

    #... do stuff here ...
    #... and here ...

    # bunch of logger.info() statements here... 
    #
    #
    #
    #

    #... do stuff here ...
    #... and here ...

    return test_y, prd

This works well when there is only one process at a time. However, I get an _pickle.PicklingError: Could not pickle the task to send it to the workers. error when running in multiprocess mode. What am I doing wrong? How can I remediate this? I don't mind switching to something other than loguru or logzero, as long as I can create one file with coherent logs, or even n files, each of which contain the logs of every iteration of joblib.

CodingInCircles
  • 2,565
  • 11
  • 59
  • 84
  • ***`_pickle.PicklingError:`***: I feel this has noting to do with logging, as it stands you are only log strings. [Edit] your question and show the **Full Traceback**. ***"the console output is garbled "***: Follow this answer [print-overwriting-itself](https://stackoverflow.com/questions/49130540/python-threading-print-overwriting-itself) – stovfl Dec 21 '19 at 08:38
  • By garbled, I mean that the output of each of the processes write to console and it becomes impossible to know "who" is writing what. As such, I don't have the issue in the question you've linked to. – CodingInCircles Dec 21 '19 at 23:05
  • ***"becomes impossible to know "who" is writing what"***: That's what the linked answer solves by serialize into a `Queue`. Results in **one point printing** no garbled output possible. – stovfl Dec 21 '19 at 23:18

3 Answers3

1

Core concept: The main process AND each child process needs to call logger.add(). You can use the same filename to pipe all logs to the same file.

# Pseudo-code to get the idea
def main():
    logfile = 'execution.log'
    # Use enqueue to ensure works properly with multiprocessing
    logger.add(logfile, enqueue=True)
    ...
    # Add logfile to the params passed to get_pred
    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logfile) for s in stock_list)

# Add logfile as param to get_pred
def get_pred(cust_df, stock_id, logfile):
    # Add the *same* logfile each time the child process is called!
    logger.add(logfile, enqueue=True)
    # Add identifiers to log messages to distinguish them
    logger.info(f'{stock_id} - more info')
    # ...

In @CodinginCircles answer, in the get_pred() function, they call logger.add() with a unique logfile name each time. This creates many different log files.

Instead, you can call logger.add() with the same name each time and all the logs will go to the same log file. Setting enqueue=True will help ensure it works correctly with multiprocessing.

To know which log corresponds to which thing (stocks in our case), just add the stock name to the log message e.g. logger.info(f'{stock_id} - more info')

Moreover, I've found adding backtrace=True, diagnose=True as params in logger.add() be particularly helpful when using Loguru.

Final note: you could also define LOGFILE = 'execution.log' as a constant outside of the function definitions, then you would not need to pass it as a param to get_pred(). The method outlined above lets you do more with the logfile name though like giving it a unique time signature.

codeananda
  • 939
  • 1
  • 10
  • 16
0

I got it to work by modifying my run_models.py. Now, I have one log file per loop. This creates a LOT of log files, but they're all relevant to each loop and not jumbled or anything. One step at a time, I guess. Here's what I did:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id):

    log_file_name = "log_file_{}".format(stock_id)

    logger.add(log_file_name, format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s) for s in stock_list)


if __name__ == "__main__":
    main()
CodingInCircles
  • 2,565
  • 11
  • 59
  • 84
-1

So the proper way to use loguru with joblib is to change the backend to multiprocessing.

from loguru import logger
from joblib import Parallel, delayed
from tqdm.autonotebook import tqdm 

logger.remove()
logger.add(sys.stdout, level = 'INFO', enqueue=True)

logger.info('test')
logger.debug('should not appear')

def do_thing(i):
    logger.info('item %i' %i)
    logger.debug('should not appaear')
    return None


Parallel(n_jobs=4, backend='multiprocessing')(
    delayed(do_thing)(i)
    for i in tqdm(range(10))
)



Parallel(n_jobs=4)(
    delayed(do_thing)(i)
    for i in tqdm(range(10))
)

The first parallel call works. The second gets the old issue you mentioned earlier

hongy
  • 1