airflowairflow-2.x

How to read http status code in downstream AirFlow task


Given I have following task and operators AirFlow code in my DAG:

    @task.branch(task_id="check_req")
    def check_req(**kwargs):
        response = kwargs['ti'].xcom_pull(task_ids='get_req')
        print(f"Response data: {response}")
        if response == 200:
            return "good"
        else:
            return "bad"

    get_req = SimpleHttpOperator(
        task_id='get_req',
        http_conn_id="http_conn",
        endpoint='endpoint/resource',
        method='GET',
        headers={"Content-Type": "application/json"},
    )

    check_req_op = check_req()
    begin = EmptyOperator(task_id="begin")
    end = EmptyOperator(task_id="end", trigger_rule="none_failed_min_one_success")
    good = EmptyOperator(task_id="good")
    bad = EmptyOperator(task_id="bad")

    begin >> get_req >> check_req_op
    check_req_op >> good >> end
    check_req_op >> bad >> end

In the check_req task I want to check the http status code but I am just getting the response body. How can I get the http response status code?


Solution

  • The SimpleHttpOperator accepts an argument response_filter which accepts a function as its argument. It passes this function the entire response object including the http status code. This makes it possible to transform the response and just return the http status code.

    get_req = SimpleHttpOperator(
        task_id='get_req',
        http_conn_id="http_conn",
        endpoint='endpoint/resource',
        method='GET',
        headers={"Content-Type": "application/json"},
        extra_options={"check_response": False},
        response_check=lambda response: True if allow_all(response) is True else False,
        response_filter=lambda response: response.status_code,
    )
    

    NOTE

    By default, the SimpleHttpOperator fails when 404 response is returned by the endpoint. In my case, I did not want the DAG to fail but I wanted to run a different task if the http status code is not 200. To prevent the failure add the extra_options={"check_response": False} as shown above.