Below Airflow task throws an error:
[2024-08-07T09:05:00.142+0000] {{xcom.py:664}} ERROR - Object of type GlueJobOperator is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.
Code:
import os
import sys
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task_group, task, dag
from airflow.operators.python import PythonOperator
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from utils.environment_config import EnvironmentConfig # noqa: E402
config = EnvironmentConfig(__file__)
import json
params_one = ["value"]
params_two = ["1","2"]
params_three = [4, 12, 52]
params_four = [3]
param_five = ["col"]
playground_bucket = config.get_environment_variable("playground_bucket_name", default_var="undefined")
intermittent_data_location = config.get_environment_variable("stage3_output_intermittent_location", default_var="undefined")
stage3_task_role = config.get_environment_variable("stage3_task_role", default_var="undefined")
join_bridge_script = config.get_bridge_script("join_bridge_script.py")
#default_args={ "donot_pickle": "True" }
@dag(dag_id='chore_task_group_stage3', schedule=None, catchup=False)
def pipeline():
@task
def lag_tasks_with_filter(
param_one,
demo,
param_three,
param_four,
,
lag_task_role,
intermittent_data_location,
playground_bucket
):
return GlueJobOperator(
task_id=f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
job_name=config.generate_job_name(f"param_four{param_four}-weeks{param_three}-" + f"filter{param_five}-job-{param_one}-{param_two}"),
script_location=config.get_bridge_script("lags_bridge_script.py"),
iam_role_name=lag_task_role,
script_args={
"--lagWithCatPath": f"s3://{intermittent_data_location}/output/with_cat" + f"/param_one={param_one}/param_two={param_two}",
"--rawDataInputPath": f"s3://{playground_bucket}/output/oneyear" + f"/param_one={param_one}/param_two={param_two}/",
"--numberOfLagWeeks": str(param_four),
"--windowSizeWeeks": str(param_three),
"--filterCol": param_five,
"--taskId": f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
},
create_job_kwargs={
"WorkerType": "G.2X",
"NumberOfWorkers": 5,
"GlueVersion": "4.0",
"DefaultArguments": {
"--job-language": "python",
"--enable-job-insights": "true",
"--enable-metrics": "true",
"--enable-auto-scaling": "true",
"--enable-observability-metrics": "true",
"--TempDir": f"s3://{config.get_environment_variable('glue_tmp_dir_location', default_var='undefined')}",
"--extra-py-files": config.get_asset_file_location(
"ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl"
),
"--enable-spark-ui": "true",
"--spark-event-logs-path": f"s3://{config.get_environment_variable('glue_spark_ui_logs_location', default_var='undefined')}",
},
},
update_config=True,
)
ts = DummyOperator(task_id='start')
te = DummyOperator(task_id='end')
t1 = lag_tasks_with_filter.partial(lag_task_role=stage3_task_role, intermittent_data_location=intermittent_data_location, playground_bucket=playground_bucket).expand(param_one=params_one, param_two=params_two, param_three=params_three, param_four=params_four, param_five=param_five)
# setting dependencies
ts >> t1 >> te
pipeline()
When removing return
, DAG passes but Glue jobs don't get created and triggered.
I want to keep @task
decorator syntax since it allows for creating mapped instances with expand()
.
I believe traditional operators also support the "expand" method and there is no need for a @task wrapper here. See - https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#mapping-with-non-taskflow-operators
upd#1
Summary of changes, The code is based on the provided example:
param_two
and param_three
expand_kwargs
to pass mapping for iterationfrom typing import Any
import os
import sys
from datetime import datetime, timedelta
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.decorators import task, dag
# sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
# from utils.environment_config import EnvironmentConfig # noqa: E402
# config = EnvironmentConfig(__file__)
import json
playground_bucket = (
"s3://mydummys3bucket" # config.get_environment_variable("playground_bucket_name", default_var="undefined")
)
intermittent_data_location = "myintermittent_data_location" # config.get_environment_variable("stage3_output_intermittent_location", default_var="undefined")
lag_task_role = "mystage3_task_role" # config.get_environment_variable("stage3_task_role", default_var="undefined")
join_bridge_script = "myjoin_bridge_script" # config.get_bridge_script("join_bridge_script.py")
param_one = ["value"]
param_two = ["1", "2"]
param_three = [4, 12, 52]
param_four = [3]
param_five = ["col"]
@dag(dag_id="chore_task_group_stage3", schedule=None, catchup=False)
def pipeline():
ts = EmptyOperator(task_id="start")
@task
def create_task_mappings(
param_one, param_two, param_three, param_four, param_five, intermittent_data_location, playground_bucket
) -> dict[str, Any]:
job_name = f"param_four{param_four}-weeks{param_three}-" + f"filter{param_five}-job-{param_one}-{param_two}"
script_args = {
"--lagWithCatPath": f"s3://{intermittent_data_location}/output/with_cat"
+ f"/param_one={param_one}/param_two={param_two}",
"--rawDataInputPath": f"s3://{playground_bucket}/output/oneyear"
+ f"/param_one={param_one}/param_two={param_two}/",
"--numberOfLagWeeks": str(param_four),
"--windowSizeWeeks": str(param_three),
"--filterCol": param_five,
"--taskId": f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
}
return {"job_name": job_name, "script_args": script_args}
created_mappings = create_task_mappings.partial(
param_one=param_one,
param_four=param_four,
param_five=param_five,
intermittent_data_location=intermittent_data_location,
playground_bucket=playground_bucket,
).expand(param_three=param_three, param_two=param_two)
t1 = GlueJobOperator.partial(
task_id="lag_tasks_with_filter",
create_job_kwargs={
"WorkerType": "G.2X",
"NumberOfWorkers": 5,
"GlueVersion": "4.0",
"DefaultArguments": {
"--job-language": "python",
"--enable-job-insights": "true",
"--enable-metrics": "true",
"--enable-auto-scaling": "true",
"--enable-observability-metrics": "true",
"--TempDir": f"s3://{playground_bucket}/temp",
"--extra-py-files": "ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl",
"--enable-spark-ui": "true",
"--spark-event-logs-path": f"s3://{playground_bucket}/logs",
},
},
iam_role_name=lag_task_role,
script_location=join_bridge_script,
update_config=True,
).expand_kwargs(created_mappings)
te = EmptyOperator(task_id="end")
ts >> created_mappings >> t1 >> te
pipeline()
References: Cross-product - https://www.astronomer.io/docs/learn/dynamic-tasks#cross-product Multiple params to Traditional Operator - https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#assigning-multiple-parameters-to-a-non-taskflow-operator