I have Airflow jobs, which are running fine on the EMR cluster. what I need is, let's say if I have a 4 airflow jobs which required an EMR cluster for let's say 20 min to complete the task. why not we can create an EMR cluster at DAG run time and once the job is to finish it will terminate the created an EMR cluster.
4 Answers
Absolutely, that would be the most efficient use of resources. Let me warn you: there are a lot of details in this; I'll try to list as many as would get you going. I encourage you to add your own comprehensive answer listing any problems that you encountered and the workaround (once you are through this)
Regarding cluster creation / termination
For cluster creation and termination, you have
EmrCreateJobFlowOperator
andEmrTerminateJobFlowOperator
respectivelyDon't fret if you do not use
AWS
SecretAccessKey
(and rely wholly onIAM
Roles); instantiating anyAWS
-relatedhook
oroperator
inAirflow
will automatically fall-back to underlyingEC2
's attachedIAM
RoleIf your'e NOT using the EMR-Steps API for job-submission, then you'll also have to manually sense both the above operations using
Sensors
. There's already a sensor for polling creation phase calledEmrJobFlowSensor
and you can modify it slightly to create a sensor for termination tooYou pass your cluster-config JSON in
job_flow_extra
. You can also pass configs in aConnection
's (likemy_emr_conn
)extra
param, but refrain from it because it often breaksSQLAlchemy
ORM loading (since its a bigjson
)
Regarding job submission
You either submit jobs to
Emr
using EMR-Steps API, which can be done either during cluster creation phase (within the Cluster-Configs JSON) or afterwards usingadd_job_flow_steps()
. There's even anemr_add_steps_operator()
inAirflow
which also requires anEmrStepSensor
. You can read more about it inAWS
docs and you might also have to usecommand-runner.jar
For application-specific cases (like
Hive
,Livy
), you can use their specific ways. For instance you can useHiveServer2Hook
to submit aHive
job. Here's a tricky part: Therun_job_flow()
call (made during cluster-creation phase) only gives you ajob_flow_id
(cluster-id). You'll have to use adescribe_cluster()
call usingEmrHook
to obtain the private-IP of the master node. Using this you will then be able to programmatically create aConnection
(such asHive Server 2 Thrift
connection) and use it for submitting your computations to cluster. And don't forget to delete those connections (for elegance) before completing your workflow.Finally there's the good-old bash for interacting with cluster. For this you should also pass an
EC2
key pair during cluster creation phase. Afterwards, you can programmatically create anSSH
connection and use it (with anSSHHook
orSSHOperator
) for running jobs on your cluster. Read more about SSH-stuff inAirflow
hereParticularly for submitting
Spark
jobs to remoteEmr
cluster, read this discussion

- 10,183
- 11
- 55
- 131
-
Thanks for your detail support, I have two clouds setup. Google and AWS. Can we have something like this in GCP as well? – gaurav agnihotri Mar 19 '19 at 04:25
-
`Airflow` has [quite a few](https://imgur.com/D1F02Fc) `hook`s / `operator`s for `Google Cloud` as well. And of course [`Google Cloud Composer`](https://cloud.google.com/blog/products/gcp/cloud-composer-is-now-in-beta-build-and-run-practical-workflows-with-minimal-effort) is fully managed service. Unfortunately I haven't worked with `GCP`, so can't tell you any more than this – y2k-shubham Mar 19 '19 at 06:25
The best way to do is as below
create EMR cluster >> run spark application >> wait to complete spark application >> terminate EMR cluster
import time
from airflow.operators.python import PythonOperator
from datetime import timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
# Spark-submit command for application
SPARK_APP = [
{
'Name': 'spark_app1',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'package_path_to_main',
'location_of_jar',
args],
},
}
]
# EMR cluster configurations
JOB_FLOW_OVERRIDES = {
'Name': 'emr_cluster_name',
'ReleaseLabel': 'emr-6.4.0',
'Applications': [{"Name": "Spark"}],
'LogUri': 's3_path_for_log',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'r5.8xlarge',
'InstanceCount': 1
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'r5.8xlarge',
'InstanceCount': 32
}
],
'Ec2SubnetId': 'subnet-id',
'Ec2KeyName': 'KeyPair',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
"AdditionalMasterSecurityGroups": [ "security-group" ]
},
'JobFlowRole': 'EMR_EC2_DefaultRole',
'SecurityConfiguration': "SecurityConfig_name",
'ServiceRole': 'EMR_DefaultRole',
'StepConcurrencyLevel': 10,
}
# Airflow Dag defination
with DAG(
dag_id='dag_name',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'email': ['email-address'],
'email_on_failure': True,
'email_on_retry': False,
},
dagrun_timeout=timedelta(hours=4),
start_date=days_ago(1),
schedule_interval='0 * * * *',
catchup=False,
tags=['example'],
) as dag:
# EMR cluster creator
cluster_creator = EmrCreateJobFlowOperator(
task_id='cluster_creator',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
# Adding step adder to run spark application
step_adder_1 = EmrAddStepsOperator(
task_id='step_adder_1',
job_flow_id="{{ task_instance.xcom_pull(task_ids='cluster_creator', key='return_value')}}",
aws_conn_id='aws_default',
steps=SPARK_APP,
trigger_rule='all_done',
)
# Adding step sensor to track the completion of step adder
step_checker_1 = EmrStepSensor(
task_id='step_checker_1',
job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='step_adder_1', key='return_value')[0] }}",
aws_conn_id='aws_default',
trigger_rule='all_done',
)
# Terminating EMR cluster if all task are completed which are running on top of cluster_remover task
cluster_remover = EmrTerminateJobFlowOperator(
task_id='cluster_remover',
job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}",
aws_conn_id='aws_default',
trigger_rule='all_done',
)
# defining the order of task
cluster_creator >> step_adder_1 >> step_checker_1 >> cluster_remover

- 21
- 3
The best way to do this is probably to have a node at the root of your Airflow DAG that creates the EMR cluster, and then another node at the very end of the DAG that spins the cluster down after all of the other nodes have completed.

- 1,191
- 1
- 12
- 18
Check my implementation, DAG will create emr cluster and run the spark job against the data in s3 and terminate automatically once done.
https://beyondexperiment.com/vijayravichandran06/aws-emr-orchestrate-with-airflow/

- 49
- 1
- 2