Given I have following task and operators AirFlow code in my DAG:
@task.branch(task_id="check_req")
def check_req(**kwargs):
response = kwargs['ti'].xcom_pull(task_ids='get_req')
print(f"Response data: {response}")
if response == 200:
return "good"
else:
return "bad"
get_req = SimpleHttpOperator(
task_id='get_req',
http_conn_id="http_conn",
endpoint='endpoint/resource',
method='GET',
headers={"Content-Type": "application/json"},
)
check_req_op = check_req()
begin = EmptyOperator(task_id="begin")
end = EmptyOperator(task_id="end", trigger_rule="none_failed_min_one_success")
good = EmptyOperator(task_id="good")
bad = EmptyOperator(task_id="bad")
begin >> get_req >> check_req_op
check_req_op >> good >> end
check_req_op >> bad >> end
In the check_req
task I want to check the http status code but I am just getting the response body. How can I get the http response status code?
The SimpleHttpOperator
accepts an argument response_filter
which accepts a function as its argument. It passes this function the entire response object including the http status code. This makes it possible to transform the response and just return the http status code.
get_req = SimpleHttpOperator(
task_id='get_req',
http_conn_id="http_conn",
endpoint='endpoint/resource',
method='GET',
headers={"Content-Type": "application/json"},
extra_options={"check_response": False},
response_check=lambda response: True if allow_all(response) is True else False,
response_filter=lambda response: response.status_code,
)
NOTE
By default, the SimpleHttpOperator fails when 404 response is returned by the endpoint. In my case, I did not want the DAG to fail but I wanted to run a different task if the http status code is not 200. To prevent the failure add the extra_options={"check_response": False}
as shown above.