pythonapache-sparkamazon-emrdagster

How to create an EMR cluster and submit a spark-submit step using Dagster?


I want to create a Dagster app that creates an EMR cluster and adds a spark-submit step, but due to a lack of documentation or examples I can't figure out how to do that (copilot also struggles with it :-)).

The idea is to create a scheduler with Dagster that creates an EMR cluster and runs scala-spark app as one of its steps.

Here's the code I have (it's not working correctly, but you may get a sense about what I was trying to do):

from dagster_shell import create_shell_command_op
from dagster_aws.emr.emr import EmrJobRunner
from dagster import graph, op

@op
def create_emr_cluster(context):
emr_job_runner = EmrJobRunner('us-east-1', aws_access_key_id='ACCESS_KEY', aws_secret_access='SECRET_KEY')
    cluster_id = emr_job_runner.create_cluster()
    step_dict = emr_job_runner.construct_step_dict_for_command('Spark Step', 'spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster  s3://my-bucket/spark-examples.jar stage')
    emr_job_runner.add_job_flow_steps(None, cluster_id, [step_dict])

@graph
def my_graph():
    # a = create_shell_command_op('echo "hello, world!"', name="a") # this will invoke spark-submit on an existing cluster
    # a()
    create_emr_cluster()

my_job = my_graph.to_job()

How can I do it?


Solution

  • You had most of your components correctly setup. You were only missing EMR job flow settings which sets the application you want to use(on EMR), core/task node setup and so on.. More details here: https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html

    Dagster api has a function run_job_flow which takes this input and creates a cluster. Sharing a sample code snippet

    from dagster_aws.emr import EmrJobRunner
    REGION="us-east-1"
    emr_cluster_config = {
        "Applications": [
            {
                "Name": "Spark"
            }
        ],
        "JobFlowRole": "SomeRole",
        "Instances": {
            "Ec2SubnetId": "subnet-1",
            "EmrManagedSlaveSecurityGroup": "sg-slave",
            "EmrManagedMasterSecurityGroup": "sg-master",
            "KeepJobFlowAliveWhenNoSteps": True,
            "TerminationProtected": False,
            "InstanceGroups": [
                {
                    "InstanceCount": 1,
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "SizeInGB": 32,
                                    "VolumeType": "gp3"
                                },
                                "VolumesPerInstance": 2
                            }
                        ]
                    },
                    "InstanceRole": "MASTER",
                    "InstanceType": "r6g.2xlarge",
                    "Name": "EMR Master"
                },
                {
                    "InstanceCount": 2,
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "SizeInGB": 256,
                                    "VolumeType": "gp3"
                                },
                                "VolumesPerInstance": 2
                            }
                        ]
                    },
                    "InstanceRole": "CORE",
                    "InstanceType": "r6g.2xlarge",
                    "Name": "EMR Core"
                },
                {
                    "InstanceCount":2,
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "SizeInGB": 256,
                                    "VolumeType": "gp3"
                                },
                                "VolumesPerInstance": 2
                            }
                        ]
                    },
                    "InstanceRole": "TASK",
                    "InstanceType": "r6g.2xlarge",
                    "Name": "EMR Task"
                }
            ]
        },
        "StepConcurrencyLevel": 1,
        "ReleaseLabel": "emr-5.36.0",
        "LogUri": "s3n://<somebucket>/logs/",
        "EbsRootVolumeSize": 32,
        "ServiceRole": "emr-role",
        "Name": "<cluster_name>"
    }
    emr = EmrJobRunner(region=REGION)
    # This step create the cluster
    cluster_id = emr.run_job_flow(emr_cluster_config)
    step_name = 'test_step'
    step_cmd = ['ls', '/']
    step_ids = emr.add_job_flow_steps(
         cluster_id, [emr.construct_step_dict_for_command(step_name, step_cmd)]
    )
    

    You can also look at the test cases on dagster repo, it does provide a very good examples for the same.