TL;DR;
I wasn't able to use prefect's FlowRunner to solve the above question. I likely either used it wrong (see below) or missed something. Would really appreciate any pointers!
The Problem
I read through the fantastic prefect core documentation and found the sections on Handling Failure and Local Debugging to be the most relevant to this (may have missed something!). The FlowRunner class appeared (to me) to be the solution.
To see if I could use Flow Runner to resume a failed flow:
- Made a failing flow run:
from time import sleep
import prefect
from prefect import Flow, task
@task
def success():
sleep(3)
return
@task
def failure():
return 1 / 0
def get_flow_runner():
with Flow("Success/Failure") as flow:
success()
failure()
return prefect.engine.FlowRunner(flow)
- Ran it in iPython and saved the state:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
Replaced 1 / 0 with 1 / 1 in
failure()
so task would be successful:And finally passed the previous state to the
flow_runner
hoping that it would resume the flow:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)
The entire flow ran again including the 3 second successful task.