Disclaimer: i am a fairly new noob to Airflow - and would love each advise ^^
soo i need to read a data from BigQuery (in this case: a list of project_ids)
and from that information create a dynamically mapped task that will run operations on each project in that list: (in my example just printing the Project id in a new task)
this is my DAG config:
`
import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryGetDataOperator
from airflow import XComArg
def printer_task(data_list):
list = []
for value in data_list:
project_id = value.get('project')
list.append(project_id)
print(f"project_ID: {list}")
def fetch_data_from_bq():
return BigQueryGetDataOperator(
task_id="fetch_data_from_bq",
project_id="Project-X",
dataset_id="Dataset-X",
table_id="Table-X",
gcp_conn_id="bigquery_default",
max_results=10,
selected_fields='project',
#as_dict=True
)
with models.DAG(
dag_id="dummy-task",
default_args=default_args,
schedule_interval='@daily',
) as dag:
get_data = fetch_data_from_bq()
start_task = DummyOperator(task_id="start_task", dag=dag)
end_task = DummyOperator(task_id="end_task", dag=dag)
create_mapped_tasks = PythonOperator.partial(
task_id="create_mapped_tasks",
python_callable=printer_task
).expand(op_args=XComArg(get_data))
start_task >> get_data >> create_mapped_tasks >> end_task
in the picture you can see 10 mapped tasks being created but they all fail
can someone please explain to me why? ^^ and maybe help me with a solution?
cheers and than for your help
already tried different approaches but i always get this error message in the mapped tasks [2024-02-13, 00:02:50 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 27244 for task create_mapped_tasks ('str' object has no attribute 'get'; 1423691)
When you expand an Airflow operator with a list of values (the list of fetched project_ids), each task receives and processes an element (not a list of a single element).
To fix your issue, you can try:
def printer_task(value):
print(f"project_ID: {value}")