airflow

Count attempts in airflow sensor


I have a sensor that waits for a file to appear in an external file system

The sensor uses mode="reschedule"

I would like to trigger a specific behavior after X failed attempts.

Is there any straightforward way to know how many times the sensor has already attempted to run the poke method?

My quick fix so far has been to push an XCom with the attempt number, and increase it every time the poke method returns False. Is there any built-in mechanism for this?

Thank you


Solution

  • Apparently, the XCom thing isn't working, because pushed XComs don't seem to be available between pokes; they always return undefined.

    try_number inside task_instance doesn't help either, as pokes don't count as a new try number

    I ended up computing the attempt number by hand:

    attempt_no = math.ceil((pendulum.now(tz='utc') - kwargs['ti'].start_date).seconds / kwargs['task'].poke_interval)
    

    The code will work fine as long as individual executions of the poke method don't last longer than the poke interval (which they shouldn't)

    Best