@mock.patch.dict(
"os.environ",
AIRFLOW_CONN_LIVY_HOOK = "http://www.google.com",
clear= True
)
class TestLivyOperator(unittest.TestCase):
def setUp(self):
super().setUp()
self.dag = DAG(
dag_id = "test_livy",
default_args = {
"owner" : "xyz",
"start_date" : datetime(2022, 8, 16),
},
)
@requests_mock.mock()
def test_payload(self, mock_request):
task = LivyOperator(
task_id = "task_1",
class_name = "com.precious.myClass"
executor_memory = "512m",
executor_cores = 3,
arg = spark_args,
livy_conn_id = AIRFLOW_CONN_LIVY_HOOK,
dag = self.dag,
)
I get connection not defined error airflow.exceptions.AirflowNotFoundException: The conn_id 'AIRFLOW_CONN_LIVY_HOOK' isn't defined
As per the Airflow doc https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#storing-connections-in-environment-variables The naming convention is AIRFLOW_CONN_{CONN_ID}, all uppercase (note the single underscores surrounding CONN). So if your connection id is my_prod_db then the variable name should be AIRFLOW_CONN_MY_PROD_DB.Hence the above code should be
@requests_mock.mock()
def test_payload(self, mock_request):
task = LivyOperator(
task_id = "task_1",
class_name = "com.precious.myClass"
executor_memory = "512m",
executor_cores = 3,
arg = spark_args,
livy_conn_id = "livy_hook",
dag = self.dag,
)