pythonamazon-web-servicesairflowmwaa

Airflow DAG throws GlueJobOperator is not JSON serializable


Below Airflow task throws an error:

[2024-08-07T09:05:00.142+0000] {{xcom.py:664}} ERROR - Object of type GlueJobOperator is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.

Code:

import os
import sys
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task_group, task, dag
from airflow.operators.python import PythonOperator

sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

from utils.environment_config import EnvironmentConfig  # noqa: E402

config = EnvironmentConfig(__file__)

import json


params_one = ["value"]
params_two = ["1","2"]

params_three = [4, 12, 52]
params_four = [3]
param_five = ["col"]

playground_bucket = config.get_environment_variable("playground_bucket_name", default_var="undefined")
intermittent_data_location = config.get_environment_variable("stage3_output_intermittent_location", default_var="undefined")
stage3_task_role = config.get_environment_variable("stage3_task_role", default_var="undefined")
join_bridge_script = config.get_bridge_script("join_bridge_script.py")


#default_args={ "donot_pickle": "True" }
@dag(dag_id='chore_task_group_stage3', schedule=None, catchup=False)
def pipeline():

    @task
    def lag_tasks_with_filter(
        param_one,
        demo,
        param_three,
        param_four,
        ,
        lag_task_role,
        intermittent_data_location,
        playground_bucket
    ):

        return GlueJobOperator(
            task_id=f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
            job_name=config.generate_job_name(f"param_four{param_four}-weeks{param_three}-" + f"filter{param_five}-job-{param_one}-{param_two}"),
            script_location=config.get_bridge_script("lags_bridge_script.py"),
            iam_role_name=lag_task_role,
            script_args={
                    "--lagWithCatPath": f"s3://{intermittent_data_location}/output/with_cat" + f"/param_one={param_one}/param_two={param_two}",
                    "--rawDataInputPath": f"s3://{playground_bucket}/output/oneyear" + f"/param_one={param_one}/param_two={param_two}/",
                    "--numberOfLagWeeks": str(param_four),
                    "--windowSizeWeeks": str(param_three),
                    "--filterCol": param_five,
                    "--taskId": f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",    
            },
            create_job_kwargs={
                    "WorkerType": "G.2X",
                    "NumberOfWorkers": 5,
                    "GlueVersion": "4.0",
                    "DefaultArguments": {
                                        "--job-language": "python",
                                        "--enable-job-insights": "true",
                                        "--enable-metrics": "true",
                                        "--enable-auto-scaling": "true",
                                        "--enable-observability-metrics": "true",
                                        "--TempDir": f"s3://{config.get_environment_variable('glue_tmp_dir_location', default_var='undefined')}",
                                        "--extra-py-files": config.get_asset_file_location(
                                            "ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl"
                                        ),
                                        "--enable-spark-ui": "true",
                                        "--spark-event-logs-path": f"s3://{config.get_environment_variable('glue_spark_ui_logs_location', default_var='undefined')}",
                                    },
            },
            update_config=True,
        )


    ts = DummyOperator(task_id='start')
    te = DummyOperator(task_id='end')
    t1 = lag_tasks_with_filter.partial(lag_task_role=stage3_task_role, intermittent_data_location=intermittent_data_location, playground_bucket=playground_bucket).expand(param_one=params_one, param_two=params_two, param_three=params_three, param_four=params_four, param_five=param_five)

    # setting dependencies
    ts >> t1 >> te

pipeline()

When removing return, DAG passes but Glue jobs don't get created and triggered.

I want to keep @task decorator syntax since it allows for creating mapped instances with expand().


Solution

  • I believe traditional operators also support the "expand" method and there is no need for a @task wrapper here. See - https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#mapping-with-non-taskflow-operators

    upd#1

    Summary of changes, The code is based on the provided example:

    from typing import Any
    import os
    import sys
    from datetime import datetime, timedelta
    
    from airflow.operators.empty import EmptyOperator
    from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
    from airflow.decorators import task, dag
    
    # sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
    
    # from utils.environment_config import EnvironmentConfig  # noqa: E402
    
    # config = EnvironmentConfig(__file__)
    
    import json
    
    
    playground_bucket = (
        "s3://mydummys3bucket"  # config.get_environment_variable("playground_bucket_name", default_var="undefined")
    )
    intermittent_data_location = "myintermittent_data_location"  # config.get_environment_variable("stage3_output_intermittent_location", default_var="undefined")
    lag_task_role = "mystage3_task_role"  # config.get_environment_variable("stage3_task_role", default_var="undefined")
    join_bridge_script = "myjoin_bridge_script"  # config.get_bridge_script("join_bridge_script.py")
    
    param_one = ["value"]
    param_two = ["1", "2"]
    param_three = [4, 12, 52]
    param_four = [3]
    param_five = ["col"]
    
    
    @dag(dag_id="chore_task_group_stage3", schedule=None, catchup=False)
    def pipeline():
    
        ts = EmptyOperator(task_id="start")
    
        @task
        def create_task_mappings(
            param_one, param_two, param_three, param_four, param_five, intermittent_data_location, playground_bucket
        ) -> dict[str, Any]:
            job_name = f"param_four{param_four}-weeks{param_three}-" + f"filter{param_five}-job-{param_one}-{param_two}"
            script_args = {
                "--lagWithCatPath": f"s3://{intermittent_data_location}/output/with_cat"
                + f"/param_one={param_one}/param_two={param_two}",
                "--rawDataInputPath": f"s3://{playground_bucket}/output/oneyear"
                + f"/param_one={param_one}/param_two={param_two}/",
                "--numberOfLagWeeks": str(param_four),
                "--windowSizeWeeks": str(param_three),
                "--filterCol": param_five,
                "--taskId": f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
            }
    
            return {"job_name": job_name, "script_args": script_args}
    
        created_mappings = create_task_mappings.partial(
            param_one=param_one,
            param_four=param_four,
            param_five=param_five,
            intermittent_data_location=intermittent_data_location,
            playground_bucket=playground_bucket,
        ).expand(param_three=param_three, param_two=param_two)
    
        t1 = GlueJobOperator.partial(
            task_id="lag_tasks_with_filter",
            create_job_kwargs={
                "WorkerType": "G.2X",
                "NumberOfWorkers": 5,
                "GlueVersion": "4.0",
                "DefaultArguments": {
                    "--job-language": "python",
                    "--enable-job-insights": "true",
                    "--enable-metrics": "true",
                    "--enable-auto-scaling": "true",
                    "--enable-observability-metrics": "true",
                    "--TempDir": f"s3://{playground_bucket}/temp",
                    "--extra-py-files": "ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl",
                    "--enable-spark-ui": "true",
                    "--spark-event-logs-path": f"s3://{playground_bucket}/logs",
                },
            },
            iam_role_name=lag_task_role,
            script_location=join_bridge_script,
            update_config=True,
        ).expand_kwargs(created_mappings)
    
        te = EmptyOperator(task_id="end")
    
        ts >> created_mappings >> t1 >> te
    
    
    pipeline()
    

    References: Cross-product - https://www.astronomer.io/docs/learn/dynamic-tasks#cross-product Multiple params to Traditional Operator - https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#assigning-multiple-parameters-to-a-non-taskflow-operator