workflowprefect

I would like to know how to implement specific exception handling in prefect2


Suppose you have a FLOW that is scheduled every 5 minutes.

from prefect import flow,task

@task
def my_favorite_function():
    raise ValueError("This flow immediately fails")

@task
def one_return():
    return 1

@task
def tow_return():
    return 2

@flow
def run_flow():
    my_favorite_function()
    one_return()
    tow_return()

How to pend subsequent tasks, flows, and schedules when a task fails

In this case I want to pend one_return(),tow_return() and then the same flow scheduled, all until the error is resolved

I read the documentation but couldn't figure out how to implement the specifics.


Solution

  • By default, when any task fails, the flow run fails immediately. If you want to avoid that, you could change your flow code as follows to leverage task runner and leverage blocking calls:

    @flow
    def run_flow():
        my_favorite_function.submit()
        one_return.submit()
        tow_return.submit()