11

I'd assume that most frameworks like keras/tensorflow/... automatically use all CPU cores but in practice it seems they are not. I just could find few sources which can lead us to use whole capacity of CPU during Deep learning process. I found an article which is written about usage of

from multiprocessing import Pool 
import psutil
import ray 

in another hand, based on this answer for using a keras model in multiple processes there is no track of above-mentioned libraries. Is there the more elegant way to take advantage of Multiprocessing for Keras since it's very popular for implementation.

  • For instance , how can modify following simple RNN implementation to achieve at least 50% capacity of CPU during learning process?

  • Should I use 2nd model as multitasking like LSTM which I comment bellow? I mean can we simultaneously manage to run multi-models by using more capacity of CPU?

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i%3==2]

Y_train= df[index]
df = df.values

#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
    dataX,dataY = [],[]
    print("Len:",len(dataset)-look_back-1)
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back,  :])
    return np.array(dataX), np.array(dataY)

Y_train=np.array(Y_train)
df=np.array(df)

look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)

#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)


model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)


#predict

Y_train=np.array(trainY)
Y_test=np.array(testX)

Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)

train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

# create and fit the Simple LSTM model as 2nd model for multi-tasking

#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

#Y_train=np.array(trainY)
#Y_test=np.array(testX)

#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)

#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
    plt.subplot(1, 2, 1)
    ax=plt.plot(hist_RNN.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title(' RNN Loss on Train and Test data')
    plt.legend()
    plt.subplot(1, 2, 2)
    ax=plt.plot(hist_LSTM.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title('LSTM Loss on Train and Test data')
    plt.legend()

    plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
    #plt.savefig('All_Losses_history_.png')
    plt.show()

Note I don't access to CUDA just I access powerful server without VGA. My aim is to take advantage of multiprocessing and multithreading for use maximum capacity of CPU instead of 30% it means just one core while I have Quad-core! Any advice would be greatly appreciated. I have uploaded a formatted csv dataset.

Update: my HW configuration is following:

  • CPU: AMD A8-7650K Radeon R7 10 Compute Cores 4C+6G 3.30 GHz
  • RAM: 16GB
  • OS: Win 7
  • Python ver 3.6.6
  • Tensorflow ver 1.8.0
  • Keras ver 2.2.4
Mario
  • 1,631
  • 2
  • 21
  • 51
  • 1
    Maybe this is [this](https://stackoverflow.com/questions/42504669/keras-tensorflow-and-multiprocessing-in-python) helpful for your problem? – Baptiste Pouthier May 28 '19 at 14:41
  • @BaptistePouthier thanks for tip but I really need someone to form the answer for simple above mentioned example so that I can understand the concept and develop it to my other complex NN models. – Mario May 28 '19 at 14:53
  • @BaptistePouthier do u have any idea? – Mario May 31 '19 at 13:10
  • can you post the output of "grep cores /proc/cpuinfo" ? also TF and Keras version – Salih Karagoz May 31 '19 at 22:54
  • Keras is using all CPU cores in default. I tested your code with my 8 core server and it gives 86 percent CPU performance. I think you may have a different problem did you check with htop/top? and post your results based on https://rosettacode.org/wiki/Linux_CPU_utilization – Salih Karagoz May 31 '19 at 23:06
  • I think one thing that you can do is increasing the batch size – Salih Karagoz May 31 '19 at 23:08
  • @SalihKaragoz I've checked the CPU performance with my desktop computer in laboratory which its configuration is as far as I remember Quad-core CPU with 16GB RAM but I will check and update it tomorrow , Keras ver `2.2.4` & TF `1.10.0` The CPU performance was 30% of capacity !! I also I could access to powerful server computer without VGA but it seems considering Keras should automatically use 80% CPU capacity at least, in practice is 30% for me !! when I increase `batch_size` in my desktop computer it will crash! Someone recommend me to use `multiprocessing` during using server computer! – Mario May 31 '19 at 23:20
  • @SalihKaragoz is it possible to modify the code such a way that I can train simultaneously several models? for example RNN and LSTM in the same time by using multi-processing libraries really? Is it possible to manage to allocate 1 or more cores in CPU for processing/training models? What's your opinion? – Mario May 31 '19 at 23:28
  • @SalihKaragoz I have updated my HW configuration. I would be happy if you have any idea to get right results as well. – Mario Jun 04 '19 at 16:33
  • @Mario well actually I did not play within the cpu levels. maybe the problem is related to framework-OS adaptation. normally Keras handles it in default. but idw in your case, good luck. – Salih Karagoz Jun 04 '19 at 22:01
  • 1
    Concerning multiprocessing this might also be interesting for you, even though its just related to your question: https://stackoverflow.com/questions/56441216/keras-on-fit-generator-and-thread-safety – Markus Jun 05 '19 at 17:27

1 Answers1

19

It's a good thing that training one model doesn't use all 100% of your CPU! Now we have space to train multiple models in parallel and speed up your overall training times.

NB: If you want to just speed up this model, look into GPUs or changing the hyperparameters like batch size and number of neurons (layer size).

Here's how you can use multiprocessing to train multiple models at the same time (using processes running in parallel on each separate CPU core of your machine).

The multiprocessing.Pool basically creates a pool of jobs that need doing. The processes will pick up these jobs and run them. When a job is finished, the process will pick up another job from the pool.

import time
import signal
import multiprocessing

def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    import keras
    from keras.models import Sequential
    from keras.layers import Dense

    print(f'Training a model with layer size {layer_size}')

    # build your model here
    model_RNN = Sequential()
    model_RNN.add(Dense(layer_size))

    # fit the model (the bit that takes time!)
    model_RNN.fit(...)

    # lets demonstrate with a sleep timer
    time.sleep(5)

    # save trained model to a file
    model_RNN.save(...)

    # you can also return values eg. the eval score
    return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

Output:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

This is easily demonstrated with a time.sleep in the code. You'll see that all 3 processes start the training job, and then they all finish at about the same time. If this was single processed, you'd have to wait for each to finish before starting the next (yawn!).

EDIT OP also wanted full code. This is difficult on Stack Overflow because I can't test in your environment and with your code. I've taken the liberty of copying and pasting your code into my template above. You may need to add some imports but this is as close as you'll get to "runnable" and "full" code.

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    from keras.layers import LSTM, SimpleRNN, Dense, Activation
    from keras.models import Sequential
    from keras.callbacks import EarlyStopping, ReduceLROnPlateau
    from keras.layers.normalization import BatchNormalization

    print(f'Training a model: {model_type}')

    callbacks = [
        EarlyStopping(patience=10, verbose=1),
        ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ]

    model = Sequential()

    if model_type == 'rnn':
        model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    elif model_type == 'lstm':
        model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model.add(Dense(480))
    model.add(BatchNormalization())
    model.add(Activation('tanh'))
    model.compile(loss='mean_squared_error', optimizer='adam')
    model.fit(
        trainX,
        trainY,
        epochs=50,
        batch_size=20,
        validation_data=(testX, testY),
        verbose=1,
        callbacks=callbacks,
    )

    # predict
    Y_Train_pred = model.predict(trainX)
    Y_Test_pred = model.predict(testX)

    train_MSE = mean_squared_error(trainY, Y_Train_pred)
    test_MSE = mean_squared_error(testY, Y_Test_pred)

    # you can also return values eg. the eval score
    return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
    dataX, dataY = [], []
    print("Len:", len(dataset) - look_back - 1)
    for i in range(len(dataset) - look_back - 1):
        a = dataset[i : (i + look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
    trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

Output of the program:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
 {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
repoleved
  • 744
  • 6
  • 11
  • Comments are not for extended discussion; this conversation has been [moved to chat](https://chat.stackoverflow.com/rooms/194507/discussion-on-answer-by-repoleved-how-can-take-advantage-of-multiprocessing-and). – George Stocker Jun 05 '19 at 17:35
  • @GeorgeStocker would you close the chat.it's done! I'm going to accept answer. Thanks – Mario Jun 06 '19 at 10:44