Broken DAG: [<redacted>]
Traceback (most recent call last):
File "<redacted>", line 198, in <module>
some_task_op() >> Transfer >> short_circuit() >> Process >> Summarize
^^^^^^^^^^^^^^
File "/python/env/lib/python3.12/site-packages/airflow/decorators/base.py", line 372, in __call__
op = self.operator_class(
^^^^^^^^^^^^^^^^^^^^
TypeError: airflow.decorators.sensor.DecoratedSensorOperator() got multiple values for keyword argument 'op_kwargs'
This error is thrown for any Python operator in my DAG, and it's quite puzzling because I'm only giving the decorator op_kwargs
once.
# dag.py
default_args = {
'depends_on_past': True,
'email': ['admin@admin.test'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'start_date': pendulum.datetime(2024, 10, 2),
}
with DAG(
dag_id='mydag',
schedule='0 14 * * *',
catchup=True,
default_args=default_args,
params={
'snapshot': Param(None, type=['null', 'string'], format='date'),
'input_base': Param(Variable.get('some-home', '/path/to/home'), type='string'),
'output_base': Param(Variable.get('some-home', '/path/to/home'), type='string'),
'overwrite': Param(False, type='boolean'),
'deploy_env': Param('production', type=['string'], enum=['staging', 'production'])
}
) as dag:
@task.sensor(
dag=dag,
task_id='some_task_op',
mode='reschedule', # default is 'poke'
poke_interval=10*60,
timeout=4*60*60,
op_kwargs={
'creds': '{{ var.value.get("api-credentials") }}',
},
)
def some_task_op(params, data_interval_end, creds=None):
pass
The sensor decorator function forwards extra options as the op_kwargs
to initialize the underlying operator class.
You can write:
@task.sensor(
dag=dag,
task_id='some_task_op',
mode='reschedule', # default is 'poke'
poke_interval=10*60,
timeout=4*60*60,
creds='{{ var.value.get("api-credentials") }}',
)
def some_task_op(params, data_interval_end, creds=None):
pass