4

I am loading my pre-trained keras model and then trying to parallelize a large number of input data using dask? Unfortunately, I'm running into some issues with this relating to how I'm creating my dask array. Any guidance would be greatly appreciated!

Setup:

First I cloned from this repo https://github.com/sanchit2843/dlworkshop.git

Reproducible Code Example:

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import train_test_split
from keras.models import load_model
import keras
from keras.models import Sequential
from keras.layers import Dense
from dask.distributed import Client
import warnings
import dask.array as DaskArray
warnings.filterwarnings('ignore')

dataset = pd.read_csv('data/train.csv')
X = dataset.drop(['price_range'], axis=1).values
y = dataset[['price_range']].values

# scale data
sc = StandardScaler()
X = sc.fit_transform(X)
ohe = OneHotEncoder()
y = ohe.fit_transform(y).toarray()

X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.2)

# Neural network
model = Sequential()
model.add(Dense(16, input_dim=20, activation="relu"))
model.add(Dense(12, activation="relu"))
model.add(Dense(4, activation="softmax"))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=100, batch_size=64)

# Use dask
client = Client()
def load_and_predict(input_data_chunk):

    def contrastive_loss(y_true, y_pred):
        margin = 1
        square_pred = K.square(y_pred)
        margin_square = K.square(K.maximum(margin - y_pred, 0))
        return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

    mlflow.set_tracking_uri('<uri>')
    mlflow.set_experiment('clean_parties_ml')
    runs = mlflow.search_runs()
    artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
    model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
    y_pred = model.predict(input_data_chunk)
    return y_pred

da_input_data = da.from_array(X_test, chunks=(100, None))
prediction_results = da_input_data.map_blocks(load_and_predict, dtype=X_test.dtype).compute()

The Error I'm receiving:

AttributeError: '_thread._local' object has no attribute 'value'
Riley Hun
  • 2,541
  • 5
  • 31
  • 77

2 Answers2

1

So code speaks best I think. I ended up using the delayed interface in Dask to distribute the VGG16 model with modified layers to extract features from images. And yes, I have this working with Dask distributed. Goes a bit something like this:

def calculate_features(model, image):
    frame = np.frombuffer(image, dtype=np.uint8)
    frame = frame.reshape(1, 224, 224, 3)
    frame = preprocess_input(frame)
    features = model.predict(frame, verbose=0)
    return features.tobytes()

@dask.delayed
def build_vgg16_model():
    vgg16 = VGG16()
    vgg16 = Model(inputs=vgg16.input, outputs=vgg16.layers[-2].output)
    return vgg16

@dask.delayed
def get_features(model, df):
    rows = []
    for row in df.itertuples():
        features = calculate_features(model, row.image)
        
        rows.append([features])
        
    return pd.DataFrame(rows, columns=['features'])


dfi = dd.read_parquet(
    's3://bucket/images',
    engine='pyarrow',
    storage_options=minio_options
)

dfid = dfi.to_delayed(optimize_graph=True)

model = build_vgg16_model()
features = [get_features(model, df) for df in dfid]

dff = dd.from_delayed(features, verify_meta=False, meta={
    'features': 'bytes'
})

Now. Interesting to note. That the model ended up on a single machine and the entire CPU was limited to that machine. Boo... I think this method only loads the model once in the Dask graph and sends it to the first worker that wants it. I think that map_blocks or map_partitions would have tried to distribute the model to EVERY single partition acrossed the nodes. But, that can be highly inefficient and lead to large amounts of network traffic and memory usage. (The VGG16 model is a bit big)

I can kind of control that will a little trick like this...

def partition(arr, chunks):
    return [arr[i::chunks] for i in range(chunks)]

features = []
for chunk in partition(dfid, 16):
    model = build_vgg16_model()
    features.extend([get_features(model, df) for df in chunk])

Then calling dd.from_delayed on the features. Now this will limit the model to be loaded only 16 times in the Dask graph. I did see significantly more distribution of CPU load with this approach without the added network overhead.

Haven't played with this much further but I'd hope that I could load the model X amount of times equal to the amount of workers I have, make sure each worker gets a model, and then start streaming in the partitions. I'll update my post as my investigation continues...

VocoJax
  • 1,469
  • 1
  • 13
  • 19
0

Keras/Tensorflow don't play nicely with other threaded systems. There is an ongoing issue on this topic here: https://github.com/dask/dask-examples/issues/35

MRocklin
  • 55,641
  • 23
  • 163
  • 235