6

TL;DR;

I wasn't able to use prefect's FlowRunner to solve the above question. I likely either used it wrong (see below) or missed something. Would really appreciate any pointers!


The Problem

I read through the fantastic prefect core documentation and found the sections on Handling Failure and Local Debugging to be the most relevant to this (may have missed something!). The FlowRunner class appeared (to me) to be the solution.

To see if I could use Flow Runner to resume a failed flow:

  • Made a failing flow run:
from time import sleep

import prefect
from prefect import Flow, task


@task
def success():
    sleep(3)
    return


@task
def failure():
    return 1 / 0


def get_flow_runner():
    with Flow("Success/Failure") as flow:

        success()
        failure()

    return prefect.engine.FlowRunner(flow)
  • Ran it in iPython and saved the state:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
  • Replaced 1 / 0 with 1 / 1 in failure() so task would be successful:

  • And finally passed the previous state to the flow_runner hoping that it would resume the flow:

In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)

The entire flow ran again including the 3 second successful task.

blong
  • 2,815
  • 8
  • 44
  • 110
rdmolony
  • 601
  • 1
  • 7
  • 15

1 Answers1

5

The issue here is that you are rebuilding your Flow with each run, which changes the Task objects. state.result is a dictionary whose keys are Task objects - if the underlying Task object changes in any way, so will its hash. You should instead manually create the dictionary of states using the updated Task objects, like so:

from prefect.engine.state import Success

failure_task = runner.flow.get_tasks(name="failure")[0]
task_states = {failure_task: Success("Mocked success")}
chriswhite
  • 1,370
  • 10
  • 21
  • 1
    Thanks @chriswhite this solves the above example! I tried this in a new scenario where tasks pass data, and my mocked success was overridden. There's a section on checkpointing with result handlers in the prefect docs, could this be used to resume a flow at a broken task? – rdmolony Jul 28 '20 at 09:26
  • 2
    Could you share the code? If I had to guess you have multiple tasks named "failure" in your other example. FYI to mock data passage you can use `Success("Mocked data", result=42)` or whatever the data may be. – chriswhite Jul 28 '20 at 15:57