1

I am trying to use one of the Hugging Face models with ML flow. My input is a pyspark DataFrame. The issue is Mlflow doesn't support directly HuggingFace models, so need to use the flavor pyfunc to save it. So I need create a Python class that inherits from PythonModel and then place everything needed there. How can I use a pandas_udf function inside this PythonModel? It keeps failing because I haven't specified the hint type for all the parameters inside my pandas_udf.

class RobertaClassifier(PythonModel):

    def load_context(self, context: PythonModelContext):
        import os
        from transformers.models.auto import AutoConfig,   AutoModelForSequenceClassification
        from transformers.models.auto.tokenization_auto import AutoTokenizer
        
        config_file = os.path.dirname(context.artifacts["config"])
        self.config = AutoConfig.from_pretrained(config_file)
        self.tokenizer = AutoTokenizer.from_pretrained(config_file)
        self.model = AutoModelForSequenceClassification.from_pretrained(config_file, config=self.config)
        
        if torch.cuda.is_available():
            print('[INFO] Model is being sent to CUDA device as GPU is available')
            self.model = self.model.cuda()
        else:
            print('[INFO] Model will use CPU runtime')
        
        _ = self.model.eval()
    
    
    @pandas_udf("label string, score float")
    def predict_batch_udf(self, data: pd.Series) -> pd.Series:
        import torch
        import pandas as pd
        
        with torch.no_grad():
            inputs = preprocessing(data['content'])
            inputs = self.tokenizer(inputs, padding=True, return_tensors='pt', max_length=512, truncation=True)
        
            if self.model.device.index != None:
                torch.cuda.empty_cache()
                for key in inputs.keys():
                    inputs[key] = inputs[key].to(self.model.device.index)

            predictions = self.model(**inputs)
            probs = torch.nn.Softmax(dim=1)(predictions.logits)
            probs = probs.detach().cpu().numpy()

            labels = probs.argmax(axis=1)
            scores = probs.max(axis=1)

            return labels, scores
        
    def predict(self, context: PythonModelContext, data: pd.Series) -> pd.Series:
        import math
        import numpy as np
        
        batch_size = 64
        sample_size = len(data)
        
        labels = np.zeros(sample_size)
        scores = np.zeros(sample_size)

        for batch_idx in range(0, math.ceil(sample_size / batch_size)):
            bfrom = batch_idx * batch_size
            bto = bfrom + batch_size
            
            l, s = self._predict_batch(data.iloc[bfrom:bto])
            labels[bfrom:bto] = l
            scores[bfrom:bto] = s
            
        return pd.DataFrame({'label': [self.config.id2label[l] for l in labels], 
                             'score': scores })
Alex Ott
  • 80,552
  • 8
  • 87
  • 132
Anna_v
  • 11
  • 1

0 Answers0