18

I am trying to learn in a very simple way how luigi works. Just as a newbie I came up with this code

import luigi

class class1(luigi.Task):

  def requires(self):
     return class2()

  def output(self):
    return luigi.LocalTarget('class1.txt')

 def run(self):
    print 'IN class A'


class class2(luigi.Task): 

  def requires(self):
     return []

  def output(self):
     return luigi.LocalTarget('class2.txt')


if __name__ == '__main__':
  luigi.run()

Running this in command prompt gives error saying

raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ',', '.join(missing)))      

which is:

RuntimeError: Unfulfilled dependency at run time: class2__99914b932b  
   
KansaiRobot
  • 7,564
  • 11
  • 71
  • 150
Fact
  • 1,957
  • 1
  • 17
  • 26
  • what command are you using to run it? your `run()` methods need to create the output files referenced in the `LocalTarget` in order for the dependencies to be met. – MattMcKnight Mar 31 '17 at 12:21

4 Answers4

21

This happens because you define an output for class2 but never create it.

Let's break it down...

When running

python file.py class2 --local-scheduler

luigi will ask:

  • is the output of class2 already on disk? NO
  • check dependencies of class2: NONE
  • execute the run method (by default it's and empty method pass)
  • run method didn't return errors, so job finishes successfully.

However, when running

python file.py class1 --local-scheduler

luigi will:

  • is the output of class1 already on disk? NO
  • check task dependencies: YES: class2
  • pause to check status of class2
    • is the output of class2 on disk? NO
    • run class2 -> running -> done without errors
    • is the output of class2 on disk? NO -> raise error

luigi never runs a task unless all of its previous dependencies are met. (i.e. their output is on the file system)

mjalajel
  • 2,171
  • 21
  • 27
1

This error comes because if you get the output that will never create. ex. if the output folder create by timestamp. timestamp change in every second so it will never be the same. so the error could be come.

1

I had the same error, but I still haven't found it

class data_ingestion(luigi.Task):

def run(self):
    data = pd.read_csv('F:\Mega\MEGAsync\VS Code\winequality-red.csv', sep=';')
    data.to_csv(self.output().path, index=False)

def output(self):
    return luigi.LocalTarget('WineQuality.csv')

class data_prep(luigi.Task):

def requires(self):
    return data_ingestion()

def output(self):
    return [luigi.LocalTarget('Train.csv'), luigi.LocalTarget('Val.csv')]

def run(self):
    data = pd.read_csv('WineQuality.csv') # Lendo de um csv
    logger.info('\n Leitura rápida nos dados')
    data.head()

    column_target = 'quality' # Variável que se deseja prever
    columns_features = data.drop([column_target], axis=1)

    logger.info(f'=== Variável a ser predita: {column_target}')
    logger.info(f'=== Características disponíveis: {columns_features}')

    logger.info('Divisão do dataset em TREINO e TESTE (Validação)')
    data_train, data_val = train_test_split(data, test_size=0.2, stratify=data[column_target], random_state=1)
    
    logger.info(f"Salvando Train File")
    data_train.to_csv(self.output()[0].path, index=False)

    logger.info(f"Salvando Val File")
    data_val.to_csv(self.output()[1].path, index=False)
    

class training(luigi.Task):

def requires(self):
    return data_prep()

def output(self):
    return luigi.LocalTarget('joblibe_file') 

def run(self):

    data_train = pd.read_csv(self.input()[0].path)

    column_target = 'quality' # Variável que se deseja prever
    data_features = data_train.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_train = data_train[columns_features].values
    Y_train = data_train[column_target].values

    model = DecisionTreeRegressor() # Não implementei nenhum parâmetro pois preciso estudar certinho isso
    model.fit(X_train, Y_train)

    # Salvando o arquivo em um diretório de trabalho
    joblib_file = "joblib_model.pkl"
    joblib.dump(model, joblib_file)

    

class validation(luigi.Task):

def requires(self):
    return training()

def output(self):
    return luigi.LocalTarget('Metrics.csv')

def run(self):
    data_val = pd.read_csv(self.input()[1].path)
    
    column_target = 'quality' # Variável que se deseja prever
    data_features = data_val.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_val = data_val[columns_features].values
    Y_val = data_val[column_target].values
    
    # Importando o modelo salvo no treinamento
    joblib_model = joblib.load(self.input()[0].path) 

    y_val_predict = joblib_model.predict(X_val)
    score = joblib_model.score(X_val, Y_val)

    logger.info('=== Variáveis Preditas')
    logger.info(y_val_predict)
    logger.info('=== Acurácia')
    logger.info('{:.2f} %'.format(score))


    dict = {'Predições': [y_val_predict],
          'score': [score]} 

    df = pd.DataFrame(dict)

    logger.info(f"Salvando Em arquivo CSV para TESTE")
    df.to_csv(self.output()[0].path, index=False)
   
    # salvar várias métricas em um df e exportar

if name == 'main': luigi.run()

  • As it’s currently written, your answer is unclear. Please [edit] to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers [in the help center](/help/how-to-answer). – Community Jun 01 '22 at 04:18
0

I am also a beginner at luigi. Thanks for pointing out this kind of errors.

Following, the previous answer I managed to solve it adding to class2

def run(self):
     _out = self.output().open('w')
     _out.write(u"Hello World!\n")
     _out.close()
     print('in class B')
KansaiRobot
  • 7,564
  • 11
  • 71
  • 150