pythonairflowtimetable

TypeError: __init__() missing 2 required positional arguments in Airflow timetable


I am trying to implement a parameterized timetable following this https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html#parameterized-timetables and I am getting the following error message in the logs: TypeError: __init__() missing 2 required positional arguments: 'hour' and 'minute'. This is the timetable code:

`class EveryFiscalPeriod(Timetable):    
    def __init__(self, hour: int, minute: int) -> None:
        self._hour = hour
        self._minute = minute

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DagRunInfo]:
        delta = timedelta(days=28)
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            next_start = last_automated_data_interval.end
            next_end = last_automated_data_interval.end + delta
        else:  # This is the first ever run on the regular schedule.
            restriction_earliest = restriction.earliest
            next_start = restriction_earliest - delta
            if next_start is None:  # No start_date. Don't schedule.
                return None
            next_end = restriction_earliest

        return DagRunInfo(
            data_interval=DataInterval(start=next_start, end=next_end),
            run_after=DateTime.combine(next_end.date(), Time(self.hour), Time(self.minute)).replace(tzinfo=UTC),
    )`

In my DAG code I am passing the hour and minute into the class so I shouldn't be getting this error. The timetable itself without parameters works, but it breaks when I make it parameterized.

`with DAG(
    catchup=False,
    .........
    ),
    max_active_runs=1,
    schedule=EveryFiscalPeriod(hour=15, minute=30),
)......`

Stack trace:

    [2024-02-03T06:29:28.294+0000] {app.py:1744} ERROR - Exception on /dags/accrual_repot_missing_orders/grid [GET]
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.8/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/auth.py", line 53, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/decorators.py", line 168, in view_func
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/decorators.py", line 127, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/www/views.py", line 2936, in grid
    dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dagbag.py", line 189, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dagbag.py", line 271, in _add_dag_from_db
    dag = row.dag
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/serialized_dag.py", line 221, in dag
    return SerializedDAG.from_dict(data)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 1413, in from_dict
    return cls.deserialize_dag(serialized_obj["dag"])
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 1341, in deserialize_dag
    v = _decode_timetable(v)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", line 211, in _decode_timetable
    return timetable_class.deserialize(var[Encoding.VAR])
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/timetables/base.py", line 168, in deserialize
    return cls()
TypeError: __init__() missing 2 required positional arguments: 'hour' and 'minute'

Any help will be much appreciated.


Solution

  • Looking at this file https://apache.googlesource.com/airflow/+/HEAD/airflow/timetables/base.py#168 per the stack trace the default deserialize method doesn't return hour and minute. Per the guide it looks like I need to override the method to return those params.