I tried to add a logic that will send slack notification when the pipeline terminated due to some error. I tried to implement this with ExitHandler
. But, seems the ExitHandler
can’t dependent on any op. Do you have any good idea?
Asked
Active
Viewed 3,772 times
8

RunOrVeith
- 4,487
- 4
- 32
- 50

Wenmin Wu
- 1,808
- 12
- 24
1 Answers
15
I found a solution to which uses ExitHandler
. I post my code below, hope it can help someone else.
def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
"""
performs slack notifications
"""
send_slack_op = dsl.ContainerOp(
name=name,
image='wenmin.wu/slack-cli:latest',
is_exit_handler=is_exit_handler,
command=['sh', '-c'],
arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
)
send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
return send_slack_op
@dsl.pipeline(
name='forecasting-supply',
description='forecasting supply ...'
)
def ml_pipeline(
param1,
param2,
param3,
):
exit_task = slack_notification(
slack_channel = slack_channel,
name = "supply-forecasting",
status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
is_exit_handler = True
)
with dsl.ExitHandler(exit_task):
# put other tasks here
-
Hi @wenmin-wu how do the {{workflow.xxx}} parameters get resolved? Is that a feature of KFP, not seeing it documented anywhere. Any chance you know how that works? – Alex Latchford Jun 18 '20 at 16:25
-
3Hi @AlexLatchford It's a feature of argo and since kubeflow is based on argo, all argo macros can be used. Refer to https://github.com/argoproj/argo/blob/master/docs/variables.md to check all the argo macros. – Wenmin Wu Jul 15 '20 at 02:45
-
Hey Wenmin, thanks so much for the link! Definitely didn't realize this was a feature, thanks for sharing! – Alex Latchford Jul 16 '20 at 15:56
-
@WenminWu how do we get the output from any previous container passed to exit handler? I went through the argo doc, nothing seems to work. – Devs Nov 09 '20 at 08:12
-
how to access the workflow variable inside the KFPL python code. – RoyS Nov 19 '20 at 06:51
-
Hi @RoyS, just get the variable in `ml_pipeline` functions and pass to python code as a parameter. – Wenmin Wu Nov 27 '20 at 13:09