airflowdirected-acyclic-graphsairflow-scheduler

How to skip tasks on Airflow?


I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?

Lets say my DAG graph look like this: task1 > task2 > task3 > task4

And I would like to start my DAG manually from task3, what is the best way of doing that?

I've read about ShortCircuitOperator, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.

Thanks!


Solution

  • You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.

    from airflow.models import BaseOperator, SkipMixin
    from airflow.utils.decorators import apply_defaults
    
    
    class mySkippingOperator(BaseOperator, SkipMixin)
        
        @apply_defaults
        def __init__(self,
                     condition,
                     *args,
                     **kwargs):
            super().__init__(*args, **kwargs)
            self.condition = condition
        
        def execute(self, context):
    
            if self.condition:
               self.log.info('Proceeding with downstream tasks...')
               return
    
            self.log.info('Skipping downstream tasks...')
    
            downstream_tasks = context['task'].get_flat_relatives(upstream=False)
           
            self.log.debug("Downstream task_ids %s", downstream_tasks)
    
            if downstream_tasks:
                self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
    
            self.log.info("Done.")