pythonairflow

Airflow Python Operator TypeError: got multiple values for keyword argument 'op_kwargs'


Broken DAG: [<redacted>]
Traceback (most recent call last):
  File "<redacted>", line 198, in <module>
    some_task_op() >> Transfer >> short_circuit() >> Process >> Summarize
    ^^^^^^^^^^^^^^
  File "/python/env/lib/python3.12/site-packages/airflow/decorators/base.py", line 372, in __call__
    op = self.operator_class(
         ^^^^^^^^^^^^^^^^^^^^
TypeError: airflow.decorators.sensor.DecoratedSensorOperator() got multiple values for keyword argument 'op_kwargs'

This error is thrown for any Python operator in my DAG, and it's quite puzzling because I'm only giving the decorator op_kwargs once.

# dag.py

default_args = {
    'depends_on_past': True,
    'email': ['admin@admin.test'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'start_date': pendulum.datetime(2024, 10, 2),
}

with DAG(
    dag_id='mydag',
    schedule='0 14 * * *',
    catchup=True,
    default_args=default_args,
    params={
        'snapshot': Param(None, type=['null', 'string'], format='date'),
        'input_base': Param(Variable.get('some-home', '/path/to/home'), type='string'),
        'output_base': Param(Variable.get('some-home', '/path/to/home'), type='string'),
        'overwrite': Param(False, type='boolean'),
        'deploy_env': Param('production', type=['string'], enum=['staging', 'production'])
    }
) as dag:
    @task.sensor(
        dag=dag,
        task_id='some_task_op',
        mode='reschedule',  # default is 'poke'
        poke_interval=10*60,
        timeout=4*60*60,
        op_kwargs={
            'creds': '{{ var.value.get("api-credentials") }}',
        },
    )
    def some_task_op(params, data_interval_end, creds=None):
        pass

Solution

  • The sensor decorator function forwards extra options as the op_kwargs to initialize the underlying operator class.

    You can write:

        @task.sensor(
            dag=dag,
            task_id='some_task_op',
            mode='reschedule',  # default is 'poke'
            poke_interval=10*60,
            timeout=4*60*60,
            creds='{{ var.value.get("api-credentials") }}',
        )
        def some_task_op(params, data_interval_end, creds=None):
            pass