I've created a new Dag with following arguments:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'catchup': False,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
dag = DAG(
'sample_dag',
default_args=default_args,
description='sample dag',
schedule_interval="44 * * * *")
But the scheduler is not picking up the dag when the time comes. And it is running fine when I manually trigger it. Is there anything I'm missing here?
Also, the scheduler was throwing an error when the cron expression was "*/5 * * * *"
CroniterBadCronError: Exactly 5 or 6 columns has to be specified for iteratorexpression.
But the cron expression looks good me.
The reason for this is that the [time the dag runs]
= start_date
+ schedule_interval
. So if you set your start_date
to something that is dynamic then the dag will never execute since the start_date
keeps increasing with time.
It is explained here and there is also another question here on stack which has an answer too, they probably explain it better than I do.
You should change your start_date
to something static rather than datetime.now()
If you do not want backfilling in your dag you need to set the catchup=False
as a dag parameter. so something like the following:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'sample_dag',
catchup=False,
default_args=default_args,
description='sample dag',
schedule_interval="44 * * * *"
)