pythonairflow

Apache Airflow unit and integration test


I am new to Apache Airflow and I am trying to figure out how to unit/integration test my dags/tasks

Here is my directory structure

/airflow

  /dags

  /tests/dags

I created a simple DAG which has a task to reads data from a Postgres table

def read_files(ti):
    sql = "select id from files where status='NEW'"
    pg_hook = PostgresHook(postgres_conn_id="metadata")
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(sql)
    files = cursor.fetchall()
    ti.xcom_push(key="files_to_process", value=files)


with DAG(dag_id="check_for_new_files", schedule_interval=timedelta(minutes=30),
         start_date=datetime(2022, 9, 1), catchup=False) as dag:

    check_files = PythonOperator(task_id="read_files",
                               python_callable=read_files)

Is it possible to test this by mocking Airflow/Postgres connection etc


Solution

  • yes it is possible to do test in dags, here is an example of basic things you can do:

    import unittest
    from airflow.models import DagBag
    
    class TestCheckForNewFilesDAG(unittest.TestCase):
        """Check Dag"""
    
        def setUp(self):
            self.dagbag = DagBag()
    
        def test_task_count(self):
            """Check task count for a dag"""
            dag_id='check_for_new_files'
            dag = self.dagbag.get_dag(dag_id)
            self.assertEqual(len(dag.tasks), 1)
    
        def test_contain_tasks(self):
            """Check task contains in hello_world dag"""
            dag_id='check_for_new_files'
            dag = self.dagbag.get_dag(dag_id)
            tasks = dag.tasks
            task_ids = list(map(lambda task: task.task_id, tasks))
            self.assertListEqual(task_ids, ['read_files'])
    
        def test_dependencies_of_read_files_task(self):
            """Check the task dependencies of a taskin hello_world dag"""
            dag_id='check_for_new_files'
            dag = self.dagbag.get_dag(dag_id)
            read_files_task = dag.get_task('read_files')
            
            # to be use in case you have upstream task
            upstream_task_ids = list(map(lambda task: task.task_id, 
                                         read_files_task.upstream_list))
            self.assertListEqual(upstream_task_ids, [])
            
            downstream_task_ids = list(map(lambda task: task.task_id, 
                                           read_files_task.downstream_list))
            self.assertListEqual(downstream_task_ids, [])
    
    suite = unittest.TestLoader().loadTestsFromTestCase(TestHelloWorldDAG)
    unittest.TextTestRunner(verbosity=2).run(suite)
    

    In case of verifying that manipulated data of files are moved correctly the documentations suggest:

    https://airflow.apache.org/docs/apache-airflow/2.0.1/best-practices.html#self-checks

    Self-Checks

    You can also implement checks in a DAG to make sure the tasks are producing the results as expected. As an example, if you have a task that pushes data to S3, you can implement a check in the next task. For example, the check could make sure that the partition is created in S3 and perform some simple checks to determine if the data is correct.

    I think this is an excellent and straightforward way to verify a specific task.

    Here there are other useful links you can use:

    https://www.youtube.com/watch?v=ANJnYbLwLjE

    In the next ones, they talk about mock

    https://www.astronomer.io/guides/testing-airflow/

    https://medium.com/@montadhar/apache-airflow-testing-guide-7956a3f4bbf5

    https://godatadriven.com/blog/testing-and-debugging-apache-airflow/