google-cloud-dataflowapache-beamgoogle-cloud-composerairflow-2.x

Why is BeamRunPythonPipelineOperator unable to track dataflow job status, keeps waiting until job end without returning Dataflow logs?


I am triggering a Dataflow pipeline using the BeamRunPythonPipelineOperator() in Airflow on Cloud Composer (composer-2.9.8-airflow-2.9.3). The job is submitted successfully to Dataflow, however the airflow task continues to run without logging any job status updates from Dataflow, and the task exits with INFO - Process exited with return code: 0. I want to track the job status in Airflow so that I can trigger subsequent tasks based on the job status (e.g.JOB_STATE_DONE).

My operator is set up as follows:

start_dataflow_job = BeamRunPythonPipelineOperator(
        task_id="start_dataflow_job",
        runner="DataflowRunner",
        py_file=GCS_FILE_LOCATION,
        pipeline_options={
            "tempLocation": GCS_BUCKET,
            "stagingLocation": GCS_BUCKET,
            "output_project": PROJECT,
            "service_account_email": GCP_CUSTOM_SERVICE_ACCOUNT, 
            "requirements_file": "gs://GCS_CODE_BUCKET/requirements.txt", 
            "max_num_workers": "2", 
            "region": "us-east1", 
             "experiments": [
                "streaming_boot_disk_size_gb=100", 
                "workerLogLevelOverrides=com.google.cloud.dataflow#DEBUG", 
                "dataflow_service_options=enable_prime"
            ],
        },
        py_options=[],
        py_requirements=["apache-beam[gcp]~=2.60.0"],
        py_interpreter="python3",
        py_system_site_packages=False,
        dataflow_config=DataflowConfiguration(
            job_name="{{task.task_id}}",
            project_id=PROJECT,
            location="us-east1",
            wait_until_finished=False,
            gcp_conn_id="google_cloud_default",
        ),
        do_xcom_push=True,
    )

The logs are:

[2024-11-04, 04:31:42 UTC] {beam.py:151} INFO - Start waiting for Apache Beam process to complete.
[2024-11-04, 04:41:09 UTC] {beam.py:172} INFO - Process exited with return code: 0
[2024-11-04, 04:41:11 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2024-11-04, 04:41:11 UTC] {taskinstance.py:1206} INFO - Marking task as SUCCESS. dag_id=test_data_pipeline, task_id=start_dataflow_job, run_id=manual__2024-11-04T04:24:59.026429+00:00, execution_date=20241104T042459, start_date=20241104T042502, end_date=20241104T044111
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:243} INFO - Task exited with return code 0
[2024-11-04, 04:41:12 UTC] {taskinstance.py:3506} INFO - 1 downstream tasks scheduled from follow-on schedule check
[2024-11-04, 04:41:12 UTC] {local_task_job_runner.py:222} ▲▲▲ Log group end

Solution

  • Airflow uses standard the Python logging framework to write logs, and for the duration of a task, the root logger is configured to write to the task’s log. So to track the Dataflow pipeline's progress in Airflow, the logging level in your Dataflow pipeline needs to be set to INFO, I had set it to ERROR originally. Once I updated the logging level, the operator was able to submit the job and obtain the dataflow_job_id in XCOM, marking itself as success shortly after, and the sensor followed up and tracked the job status to completion.

    logging.getLogger().setLevel(logging.INFO)
    

    Read more here: Writing to Airflow task logs from your code