9

I am using Luigi to launch some pipeline. Let's take a simple exemple

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

Now let's say that myTask is raising an exception during execution. All that I am able to have is a log from luigi showing the exception.

Is there any way that luigi could propagate it or at least return a failure status ?

I would then be able to make my programm react in function of that state.

Thanks.

EDIT I forgot to specify that luigi's outputs are targetting a database when I am storing the result. If an exception is raised, no result are stored but the exception is not propagated out a luigi. I was wondering if luigi have an option to have this.

MathiasDesch
  • 352
  • 3
  • 15
  • What do you mean propagate it or return a failure status? Anywhere that the program fails, you can put in a try except clause, and then you could use the except clause to write to luigi output. Then you could read in the luigi output and react accordingly. – Charlie Haley Sep 15 '15 at 17:21
  • Sorry, your comment made me realize that I need to be more specific on the way I am using luigi and the output. I am editing. – MathiasDesch Sep 16 '15 at 07:30

2 Answers2

22

From docs:

Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).

Example:

import luigi

from my_tasks import MyTask


@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    """Will be called directly after a failed execution
    of `run` on any MyTask subclass
    """

    do_something()


luigi.run()

Luigi has a lot of events you can choose from. You can also take a look at this tests in order to learn how to listen and react to other events.

matagus
  • 6,136
  • 2
  • 26
  • 39
  • 1
    Adding that newer version of luigi handle now more Event. It became easier to handle error in the process of a task. – MathiasDesch Aug 25 '16 at 13:19
-1

What you could do is write an error to file. For instance, in your task that may fail (let's call it TaskA):

x=""
try:
    do stuff
except:
    x="error!"
with open('errorfile.log','w') as f:
    f.write(x)

Then, in a task that depends on that error, that task would require TaskA. And you could do something like this:

with open('errorfile.log','r') as f:
    if f.read()://if anything is in the error log from TaskA
        //error occurred
        do stuff
    else:
        do other stuff
Charlie Haley
  • 4,152
  • 4
  • 22
  • 36