I have a bunch of Python scripts to run some data science models. It takes quite a while and the only way to speed it up is to use multiprocessing. To achieve this, I used the joblib
library and it works really well. Unfortunately, however, this messes up logging, and the console output is garbled (expectedly so, however) too, as all processes dump their respective outputs simultaneously.
I am new to using the logging
library and followed some other SO answers to try and get it to work. I am using 8 cores for processing. Using the answers on SO, I wrote out to log files, and expected 8 new files every iteration. However, it created 8 files the first iteration, and wrote/appended only to those 8 files every loop. This was a little inconvenient and so I explored a little more and found loguru
and logzero
. While they both cover examples using multiprocessing
, neither of them show how to use it with joblib
. Here is what I have so far:
run_models.py
import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger
import pandas as pd
import psutil
from joblib import Parallel, delayed
import helper
import log
import prep_data
import stock_subscriber_data
import train_model
def get_pred(cust_df, stock_id, logger):
logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))
cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)
single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)
stock_subscriber_data.write_data(abn_df, 't1')
test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)
return True
def main():
cust_df = stock_subscriber_data.get_data()
cust_df = helper.clean_data(cust_df)
stock_list = cust_df['intStockID'].unique()
max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
num_cores = min(multiprocessing.cpu_count(), max_proc)
logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)
if __name__ == "__main__":
main()
train_model.py
import math
from datetime import datetime
from itertools import product
from math import sqrt
import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error
import helper
import stock_subscriber_data
# bunch of functions here that don't need logging...
# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
#... do stuff here ...
#... and here ...
logger.info('{0:.3f}'.format(error))
return error, model
# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
#... do stuff here ...
#... and here ...
logger.info('> Model{0} {1:.3f}'.format(key, result))
return key, result, best_model
def read_train_write(data_df, stock_id, series, last_date, logger):
#... do stuff here ...
#... and here ...
logger.info('done')
#... do stuff here ...
#... and here ...
# bunch of logger.info() statements here...
#
#
#
#
#... do stuff here ...
#... and here ...
return test_y, prd
This works well when there is only one process at a time. However, I get an _pickle.PicklingError: Could not pickle the task to send it to the workers.
error when running in multiprocess mode. What am I doing wrong? How can I remediate this? I don't mind switching to something other than loguru
or logzero
, as long as I can create one file with coherent logs, or even n
files, each of which contain the logs of every iteration of joblib
.