airflowlivypython-unittest.mock

How to mock connection for airflow's Livy Operator using unittest.mock


@mock.patch.dict(
    "os.environ",
     AIRFLOW_CONN_LIVY_HOOK = "http://www.google.com",
     clear= True
)

class TestLivyOperator(unittest.TestCase):

    def setUp(self):
        super().setUp()
        self.dag = DAG(
            dag_id = "test_livy",
            default_args = {
                "owner" : "xyz",
                "start_date" : datetime(2022, 8, 16),
            },
        )

    @requests_mock.mock()
    def test_payload(self, mock_request):
        task = LivyOperator(
            task_id = "task_1",
            class_name = "com.precious.myClass"
            executor_memory = "512m",
            executor_cores = 3,
            arg = spark_args,
            livy_conn_id = AIRFLOW_CONN_LIVY_HOOK,
            dag = self.dag,
            )

I get connection not defined error airflow.exceptions.AirflowNotFoundException: The conn_id 'AIRFLOW_CONN_LIVY_HOOK' isn't defined


Solution

  • As per the Airflow doc https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#storing-connections-in-environment-variables The naming convention is AIRFLOW_CONN_{CONN_ID}, all uppercase (note the single underscores surrounding CONN). So if your connection id is my_prod_db then the variable name should be AIRFLOW_CONN_MY_PROD_DB.Hence the above code should be

       @requests_mock.mock()
        def test_payload(self, mock_request):
                task = LivyOperator(
                task_id = "task_1",
                class_name = "com.precious.myClass"
                executor_memory = "512m",
                executor_cores = 3,
                arg = spark_args,
                livy_conn_id = "livy_hook",
                dag = self.dag,
                )