kuberneteskubeflowkfp

Kubeflow Pipeline Termination Notificaiton


I tried to add a logic that will send slack notification when the pipeline terminated due to some error. I tried to implement this with ExitHandler. But, seems the ExitHandler can’t dependent on any op. Do you have any good idea?


Solution

  • I found a solution to which uses ExitHandler. I post my code below, hope it can help someone else.

    
    def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
        """
        performs slack notifications
        """    
        send_slack_op = dsl.ContainerOp(
            name=name,
            image='wenmin.wu/slack-cli:latest',
            is_exit_handler=is_exit_handler,
            command=['sh', '-c'],
            arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
        )
        send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
        return send_slack_op
    
    @dsl.pipeline(
        name='forecasting-supply',
        description='forecasting supply ...'
    )
    def ml_pipeline(
        param1,
        param2,
        param3,
    ):
        exit_task = slack_notification(
            slack_channel = slack_channel,
            name = "supply-forecasting",
            status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
            is_exit_handler = True
        )
    
        with dsl.ExitHandler(exit_task):
            # put other tasks here