I am trying to implement a parameterized timetable following this https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html#parameterized-timetables and I am getting an error when Airflow webserver is trying to serialize the DAG. Stack trace:
Traceback (most recent call last):
File "/opt/airflow/plugins/fiscal_calendar_plugin.py", line 74, in <module> class EveryFiscalPeriod(Timetable):
File "/opt/airflow/plugins/fiscal_calendar_plugin.py", line 127, in EveryFiscalPeriod def serialize(self) -> dict:
TypeError: 'type' object is not subscriptable
The error is happening in the serialize method below. Per the guide above I need to override the serialize method to be able to pass the hour and minute parameters to the timetable. The original serialize method is this one: https://apache.googlesource.com/airflow/+/HEAD/airflow/timetables/base.py#170
This is the timetable code in fiscal_calendar_plugin.py:
class EveryFiscalPeriod(Timetable):
def __init__(self, hour: Time, minute: Time) -> None:
self._hour = hour
self._minute = minute
def serialize(self) -> dict[str, Any]:
return {"hour": self._hour.isoformat(), "minute": self._minute.isoformat()}
@classmethod
def deserialize(cls, value: dict[str, Any]) -> Timetable:
return cls(Time.fromisoformat(value["hour"]), Time.fromisoformat(value["minute"]))
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
delta = timedelta(days=28)
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
next_start = last_automated_data_interval.end
next_end = last_automated_data_interval.end + delta
else: # This is the first ever run on the regular schedule.
restriction_earliest = restriction.earliest
next_start = restriction_earliest - delta
if next_start is None: # No start_date. Don't schedule.
return None
next_end = restriction_earliest
return DagRunInfo(
data_interval=DataInterval(start=next_start, end=next_end),
run_after=DateTime.combine(next_end.date(), self.hour, self.minute).replace(tzinfo=UTC),
)
This is my DAG code where I use the EveryFiscalPeriod
class. The timetable itself without parameters works, but it breaks when I make it parameterized.
with DAG(
catchup=False,
.........
),
max_active_runs=1,
schedule=EveryFiscalPeriod(hour=Time(15), minute=Time(30)),
)......
Any help will be much appreciated.
The solution was to add: from __future__ import annotations
to my timetable file to be able to use dict instead of Dict since I have python 3.8 per the answer here: Difference between defining typing.Dict and dict?