airflowjupyterpipelinekubernetespodoperatorelyra

How to add custom component to Elyra's list of available airflow operators?


Trying to make my own component based on KubernetesPodOperator. I am able to define and add the component to the list of components but when trying to run it, I get:

Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators. Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.

and error:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/tornado/web.py", line 1704, in _execute
    result = await result
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/handlers.py", line 120, in post
    response = await PipelineProcessorManager.instance().process(pipeline)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/processor.py", line 134, in process
    res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
  File "/opt/conda/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/opt/conda/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/opt/conda/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/opt/conda/lib/python3.9/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 122, in process
    pipeline_filepath = self.create_pipeline_file(pipeline=pipeline,
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 420, in create_pipeline_file
    target_ops = self._cc_pipeline(pipeline, pipeline_name)
  File "/opt/conda/lib/python3.9/site-packages/elyra/pipeline/airflow/processor_airflow.py", line 368, in _cc_pipeline
    raise ValueError(f"Operator '{component.name}' of node '{operation.name}' is not configured "
ValueError: Operator 'KubernetesPodOperator' of node 'KubernetesPodOperator' is not configured in the list of available operators.  Please add the fully-qualified package name for 'KubernetesPodOperator' to the AirflowPipelineProcessor.available_airflow_operators configuration.

After looking through the src code, I can see in the processor_airflow.py these lines:

 # This specifies the default airflow operators included with Elyra.  Any Airflow-based
    # custom connectors should create/extend the elyra configuration file to include
    # those fully-qualified operator/class names.
    available_airflow_operators = ListTrait(
        CUnicode(),
        ["airflow.operators.slack_operator.SlackAPIPostOperator",
         "airflow.operators.bash_operator.BashOperator",
         "airflow.operators.email_operator.EmailOperator",
         "airflow.operators.http_operator.SimpleHttpOperator",
         "airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
         "airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator"],
        help="""List of available Apache Airflow operator names.
Operators available for use within Apache Airflow pipelines.  These operators must
be fully qualified (i.e., prefixed with their package names).
       """,
    ).tag(config=True)

tho I am unsure if this can be extended from the client.


Solution

  • The available_airflow_operators list is a configurable trait in Elyra. You’ll have to add the fully-qualified package name for the KubernetesPodOperator to this list in order for it to create the DAG correctly.

    To do so, generate a config file from the command line with jupyter elyra --generate-config. Open the created file and add the following line (you can add it under the PipelineProcessor(LoggingConfigurable) heading if you prefer to keep the file organized):

    c.AirflowPipelineProcessor.available_airflow_operators.append("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator")
    

    Change that string value to the correct package for your use case if it's not the above (make sure that it ends with the class name of the required operator). If you need to add multiple packages, you can also use extend rather than append.

    Edit: here is the link to the relevant documentation