Hi i am triggering a dag with a dag id dag_1 using airflow's REST api and python requests module and in this request I want to send the json data (dic) too. this is how i and sending
import json
import requests
from requests.auth import HTTPBasicAuth
dic = {
"flag": "flag",
"files": "files",
"upload_path": "config.UPLOAD_FOLDER",
"tmp_path": "config.TMP_FOLDER",
"dataset_id": "dataset_id",
"dicom_meta_data": "dicom_meta_data",
"user_id": "request.user.id",
"protocol": "http if request.is_secure() else http",
"current_site": "request.get_host()",
"deidentify": "deidentify",
"email": "request.user.email",
}
json_object = json.dumps(dic)
data = {
"conf": {},
"dag_run_id": "trigger_16",
"logical_date": "2022-08-29T11:33:49.726Z",
}
headers={
'Content-type':'application/json',
'Accept':'application/json'
}
json_payload = json.dumps(data)
r = requests.post("http://localhost:8080/api/v1/dags/dag_1/dagRuns", auth=HTTPBasicAuth("airflow", "airflow"), data=json_payload, headers=headers, json=json_object)
print(r.status_code)
print(r.text)
and its working fine. but now i want to access this json in my first task in airflow which i am not able to do. Can anyone help me in this please.
Thanks
If you want to pass the dict to your run in order to access it by dag_run.conf
or params
, you should add it to your run data:
import json
import requests
from requests.auth import HTTPBasicAuth
dic = {
"flag": "flag",
"files": "files",
"upload_path": "config.UPLOAD_FOLDER",
"tmp_path": "config.TMP_FOLDER",
"dataset_id": "dataset_id",
"dicom_meta_data": "dicom_meta_data",
"user_id": "request.user.id",
"protocol": "http if request.is_secure() else http",
"current_site": "request.get_host()",
"deidentify": "deidentify",
"email": "request.user.email",
}
data = {
"conf": dic,
"dag_run_id": "trigger_16",
"logical_date": "2022-08-29T11:33:49.726Z",
}
headers={
'Content-type':'application/json',
'Accept':'application/json'
}
json_payload = json.dumps(data)
r = requests.post("http://localhost:8080/api/v1/dags/dag_1/dagRuns", auth=HTTPBasicAuth("airflow", "airflow"), json=json_payload, headers=headers)
print(r.status_code)
print(r.text)