I want to create a Dagster app that creates an EMR cluster and adds a spark-submit step, but due to a lack of documentation or examples I can't figure out how to do that (copilot also struggles with it :-)).
The idea is to create a scheduler with Dagster that creates an EMR cluster and runs scala-spark app as one of its steps.
Here's the code I have (it's not working correctly, but you may get a sense about what I was trying to do):
from dagster_shell import create_shell_command_op
from dagster_aws.emr.emr import EmrJobRunner
from dagster import graph, op
@op
def create_emr_cluster(context):
emr_job_runner = EmrJobRunner('us-east-1', aws_access_key_id='ACCESS_KEY', aws_secret_access='SECRET_KEY')
cluster_id = emr_job_runner.create_cluster()
step_dict = emr_job_runner.construct_step_dict_for_command('Spark Step', 'spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster s3://my-bucket/spark-examples.jar stage')
emr_job_runner.add_job_flow_steps(None, cluster_id, [step_dict])
@graph
def my_graph():
# a = create_shell_command_op('echo "hello, world!"', name="a") # this will invoke spark-submit on an existing cluster
# a()
create_emr_cluster()
my_job = my_graph.to_job()
How can I do it?
You had most of your components correctly setup. You were only missing EMR job flow settings which sets the application you want to use(on EMR), core/task node setup and so on.. More details here: https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html
Dagster api has a function run_job_flow
which takes this input and creates a cluster.
Sharing a sample code snippet
from dagster_aws.emr import EmrJobRunner
REGION="us-east-1"
emr_cluster_config = {
"Applications": [
{
"Name": "Spark"
}
],
"JobFlowRole": "SomeRole",
"Instances": {
"Ec2SubnetId": "subnet-1",
"EmrManagedSlaveSecurityGroup": "sg-slave",
"EmrManagedMasterSecurityGroup": "sg-master",
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
"InstanceGroups": [
{
"InstanceCount": 1,
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB": 32,
"VolumeType": "gp3"
},
"VolumesPerInstance": 2
}
]
},
"InstanceRole": "MASTER",
"InstanceType": "r6g.2xlarge",
"Name": "EMR Master"
},
{
"InstanceCount": 2,
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB": 256,
"VolumeType": "gp3"
},
"VolumesPerInstance": 2
}
]
},
"InstanceRole": "CORE",
"InstanceType": "r6g.2xlarge",
"Name": "EMR Core"
},
{
"InstanceCount":2,
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"SizeInGB": 256,
"VolumeType": "gp3"
},
"VolumesPerInstance": 2
}
]
},
"InstanceRole": "TASK",
"InstanceType": "r6g.2xlarge",
"Name": "EMR Task"
}
]
},
"StepConcurrencyLevel": 1,
"ReleaseLabel": "emr-5.36.0",
"LogUri": "s3n://<somebucket>/logs/",
"EbsRootVolumeSize": 32,
"ServiceRole": "emr-role",
"Name": "<cluster_name>"
}
emr = EmrJobRunner(region=REGION)
# This step create the cluster
cluster_id = emr.run_job_flow(emr_cluster_config)
step_name = 'test_step'
step_cmd = ['ls', '/']
step_ids = emr.add_job_flow_steps(
cluster_id, [emr.construct_step_dict_for_command(step_name, step_cmd)]
)
You can also look at the test cases on dagster repo, it does provide a very good examples for the same.