pythonairflowtimetable

TypeError: 'type' object is not subscriptable in Airflow timetable overridden serialize method


I am trying to implement a parameterized timetable following this https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html#parameterized-timetables and I am getting an error when Airflow webserver is trying to serialize the DAG. Stack trace:

Traceback (most recent call last):                             
File "/opt/airflow/plugins/fiscal_calendar_plugin.py", line  74, in <module> class EveryFiscalPeriod(Timetable):                        
File "/opt/airflow/plugins/fiscal_calendar_plugin.py", line  127, in EveryFiscalPeriod def serialize(self) -> dict:                               
TypeError: 'type' object is not subscriptable   

The error is happening in the serialize method below. Per the guide above I need to override the serialize method to be able to pass the hour and minute parameters to the timetable. The original serialize method is this one: https://apache.googlesource.com/airflow/+/HEAD/airflow/timetables/base.py#170

This is the timetable code in fiscal_calendar_plugin.py:

class EveryFiscalPeriod(Timetable):    
    def __init__(self, hour: Time, minute: Time) -> None:
        self._hour = hour
        self._minute = minute

    def serialize(self) -> dict[str, Any]:
        return {"hour": self._hour.isoformat(), "minute": self._minute.isoformat()}

    @classmethod
    def deserialize(cls, value: dict[str, Any]) -> Timetable:
        return cls(Time.fromisoformat(value["hour"]), Time.fromisoformat(value["minute"]))

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DagRunInfo]:
        delta = timedelta(days=28)
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            next_start = last_automated_data_interval.end
            next_end = last_automated_data_interval.end + delta
        else:  # This is the first ever run on the regular schedule.
            restriction_earliest = restriction.earliest
            next_start = restriction_earliest - delta
            if next_start is None:  # No start_date. Don't schedule.
                return None
            next_end = restriction_earliest

        return DagRunInfo(
            data_interval=DataInterval(start=next_start, end=next_end),
            run_after=DateTime.combine(next_end.date(), self.hour, self.minute).replace(tzinfo=UTC),
    )

This is my DAG code where I use the EveryFiscalPeriod class. The timetable itself without parameters works, but it breaks when I make it parameterized.

with DAG(
    catchup=False,
    .........
    ),
    max_active_runs=1,
    schedule=EveryFiscalPeriod(hour=Time(15), minute=Time(30)),
)......

Any help will be much appreciated.


Solution

  • The solution was to add: from __future__ import annotations to my timetable file to be able to use dict instead of Dict since I have python 3.8 per the answer here: Difference between defining typing.Dict and dict?