8

I tried to add a logic that will send slack notification when the pipeline terminated due to some error. I tried to implement this with ExitHandler. But, seems the ExitHandler can’t dependent on any op. Do you have any good idea?

RunOrVeith
  • 4,487
  • 4
  • 32
  • 50
Wenmin Wu
  • 1,808
  • 12
  • 24

1 Answers1

15

I found a solution to which uses ExitHandler. I post my code below, hope it can help someone else.


def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
    """
    performs slack notifications
    """    
    send_slack_op = dsl.ContainerOp(
        name=name,
        image='wenmin.wu/slack-cli:latest',
        is_exit_handler=is_exit_handler,
        command=['sh', '-c'],
        arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
    )
    send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
    return send_slack_op

@dsl.pipeline(
    name='forecasting-supply',
    description='forecasting supply ...'
)
def ml_pipeline(
    param1,
    param2,
    param3,
):
    exit_task = slack_notification(
        slack_channel = slack_channel,
        name = "supply-forecasting",
        status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
        is_exit_handler = True
    )

    with dsl.ExitHandler(exit_task):
        # put other tasks here

Vipul
  • 4,038
  • 9
  • 32
  • 56
Wenmin Wu
  • 1,808
  • 12
  • 24
  • Hi @wenmin-wu how do the {{workflow.xxx}} parameters get resolved? Is that a feature of KFP, not seeing it documented anywhere. Any chance you know how that works? – Alex Latchford Jun 18 '20 at 16:25
  • 3
    Hi @AlexLatchford It's a feature of argo and since kubeflow is based on argo, all argo macros can be used. Refer to https://github.com/argoproj/argo/blob/master/docs/variables.md to check all the argo macros. – Wenmin Wu Jul 15 '20 at 02:45
  • Hey Wenmin, thanks so much for the link! Definitely didn't realize this was a feature, thanks for sharing! – Alex Latchford Jul 16 '20 at 15:56
  • @WenminWu how do we get the output from any previous container passed to exit handler? I went through the argo doc, nothing seems to work. – Devs Nov 09 '20 at 08:12
  • how to access the workflow variable inside the KFPL python code. – RoyS Nov 19 '20 at 06:51
  • Hi @RoyS, just get the variable in `ml_pipeline` functions and pass to python code as a parameter. – Wenmin Wu Nov 27 '20 at 13:09