I am trying to implement a parameterized timetable following this https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html#parameterized-timetables and I am getting the following error message in the logs: TypeError: __init__() missing 2 required positional arguments: 'hour' and 'minute'
. This is the timetable code:
`class EveryFiscalPeriod(Timetable):
def __init__(self, hour: int, minute: int) -> None:
self._hour = hour
self._minute = minute
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
delta = timedelta(days=28)
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
next_start = last_automated_data_interval.end
next_end = last_automated_data_interval.end + delta
else: # This is the first ever run on the regular schedule.
restriction_earliest = restriction.earliest
next_start = restriction_earliest - delta
if next_start is None: # No start_date. Don't schedule.
return None
next_end = restriction_earliest
return DagRunInfo(
data_interval=DataInterval(start=next_start, end=next_end),
run_after=DateTime.combine(next_end.date(), Time(self.hour), Time(self.minute)).replace(tzinfo=UTC),
)`
In my DAG code I am passing the hour and minute into the class so I shouldn't be getting this error. The timetable itself without parameters works, but it breaks when I make it parameterized.
`with DAG(
catchup=False,
.........
),
max_active_runs=1,
schedule=EveryFiscalPeriod(hour=15, minute=30),
)......`
Stack trace:
[2024-02-03T06:29:28.294+0000] {app.py:1744} ERROR - Exception on /dags/accrual_repot_missing_orders/grid [GET]
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 2529, in wsgi_app
response = self.full_dispatch_request()
File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1825, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1823, in full_dispatch_request
rv = self.dispatch_request()
File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1799, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/auth.py", line 53, in decorated
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/decorators.py", line 168, in view_func
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/decorators.py", line 127, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 79, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/views.py", line 2936, in grid
dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dagbag.py", line 189, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dagbag.py", line 271, in _add_dag_from_db
dag = row.dag
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/serialized_dag.py", line 221, in dag
return SerializedDAG.from_dict(data)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 1413, in from_dict
return cls.deserialize_dag(serialized_obj["dag"])
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 1341, in deserialize_dag
v = _decode_timetable(v)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 211, in _decode_timetable
return timetable_class.deserialize(var[Encoding.VAR])
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/timetables/base.py", line 168, in deserialize
return cls()
TypeError: __init__() missing 2 required positional arguments: 'hour' and 'minute'
Any help will be much appreciated.
Looking at this file https://apache.googlesource.com/airflow/+/HEAD/airflow/timetables/base.py#168 per the stack trace the default deserialize method doesn't return hour and minute. Per the guide it looks like I need to override the method to return those params.