I am trying to use one of the Hugging Face models with ML flow. My input is a pyspark DataFrame. The issue is Mlflow doesn't support directly HuggingFace models, so need to use the flavor pyfunc to save it. So I need create a Python class that inherits from PythonModel and then place everything needed there. How can I use a pandas_udf function inside this PythonModel? It keeps failing because I haven't specified the hint type for all the parameters inside my pandas_udf.
class RobertaClassifier(PythonModel):
def load_context(self, context: PythonModelContext):
import os
from transformers.models.auto import AutoConfig, AutoModelForSequenceClassification
from transformers.models.auto.tokenization_auto import AutoTokenizer
config_file = os.path.dirname(context.artifacts["config"])
self.config = AutoConfig.from_pretrained(config_file)
self.tokenizer = AutoTokenizer.from_pretrained(config_file)
self.model = AutoModelForSequenceClassification.from_pretrained(config_file, config=self.config)
if torch.cuda.is_available():
print('[INFO] Model is being sent to CUDA device as GPU is available')
self.model = self.model.cuda()
else:
print('[INFO] Model will use CPU runtime')
_ = self.model.eval()
@pandas_udf("label string, score float")
def predict_batch_udf(self, data: pd.Series) -> pd.Series:
import torch
import pandas as pd
with torch.no_grad():
inputs = preprocessing(data['content'])
inputs = self.tokenizer(inputs, padding=True, return_tensors='pt', max_length=512, truncation=True)
if self.model.device.index != None:
torch.cuda.empty_cache()
for key in inputs.keys():
inputs[key] = inputs[key].to(self.model.device.index)
predictions = self.model(**inputs)
probs = torch.nn.Softmax(dim=1)(predictions.logits)
probs = probs.detach().cpu().numpy()
labels = probs.argmax(axis=1)
scores = probs.max(axis=1)
return labels, scores
def predict(self, context: PythonModelContext, data: pd.Series) -> pd.Series:
import math
import numpy as np
batch_size = 64
sample_size = len(data)
labels = np.zeros(sample_size)
scores = np.zeros(sample_size)
for batch_idx in range(0, math.ceil(sample_size / batch_size)):
bfrom = batch_idx * batch_size
bto = bfrom + batch_size
l, s = self._predict_batch(data.iloc[bfrom:bto])
labels[bfrom:bto] = l
scores[bfrom:bto] = s
return pd.DataFrame({'label': [self.config.id2label[l] for l in labels],
'score': scores })