4

I have Airflow jobs, which are running fine on the EMR cluster. what I need is, let's say if I have a 4 airflow jobs which required an EMR cluster for let's say 20 min to complete the task. why not we can create an EMR cluster at DAG run time and once the job is to finish it will terminate the created an EMR cluster.

gaurav agnihotri
  • 259
  • 1
  • 5
  • 14

4 Answers4

9

Absolutely, that would be the most efficient use of resources. Let me warn you: there are a lot of details in this; I'll try to list as many as would get you going. I encourage you to add your own comprehensive answer listing any problems that you encountered and the workaround (once you are through this)


Regarding cluster creation / termination


Regarding job submission


y2k-shubham
  • 10,183
  • 11
  • 55
  • 131
  • Thanks for your detail support, I have two clouds setup. Google and AWS. Can we have something like this in GCP as well? – gaurav agnihotri Mar 19 '19 at 04:25
  • `Airflow` has [quite a few](https://imgur.com/D1F02Fc) `hook`s / `operator`s for `Google Cloud` as well. And of course [`Google Cloud Composer`](https://cloud.google.com/blog/products/gcp/cloud-composer-is-now-in-beta-build-and-run-practical-workflows-with-minimal-effort) is fully managed service. Unfortunately I haven't worked with `GCP`, so can't tell you any more than this – y2k-shubham Mar 19 '19 at 06:25
1

The best way to do is as below

create EMR cluster >> run spark application >> wait to complete spark application >> terminate EMR cluster

import time    
from airflow.operators.python import PythonOperator     
from datetime import timedelta    
from airflow import DAG    
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator    
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator    
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator     
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor 


# Spark-submit command for application

SPARK_APP = [
    {
        'Name': 'spark_app1',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [ 
            'spark-submit',
                '--deploy-mode',
                'cluster',
                '--master',
                'yarn',
                 '--class',
                    'package_path_to_main',
                 'location_of_jar',
                 args],
        },
    }
] 


# EMR cluster configurations

JOB_FLOW_OVERRIDES = {    
    'Name': 'emr_cluster_name',    
    'ReleaseLabel': 'emr-6.4.0',    
    'Applications': [{"Name": "Spark"}],
    'LogUri': 's3_path_for_log',    
    'Instances': {    
        'InstanceGroups': [    
            {    
                'Name': 'Master node',    
                'Market': 'ON_DEMAND',    
                'InstanceRole': 'MASTER',    
                'InstanceType': 'r5.8xlarge',    
                'InstanceCount': 1    
            },    
            {    
                    'Name': "Slave nodes",    
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',    
                    'InstanceType': 'r5.8xlarge',    
                    'InstanceCount': 32    
            }    
        ],    
        'Ec2SubnetId': 'subnet-id',    
        'Ec2KeyName': 'KeyPair',    
        'KeepJobFlowAliveWhenNoSteps': True,    
        'TerminationProtected': False,
        "AdditionalMasterSecurityGroups": [ "security-group" ]
    }, 
    'JobFlowRole': 'EMR_EC2_DefaultRole',  
    'SecurityConfiguration': "SecurityConfig_name",  
    'ServiceRole': 'EMR_DefaultRole',
    'StepConcurrencyLevel': 10,    
}    

# Airflow Dag defination

with DAG(    
    dag_id='dag_name',    
    default_args={    
        'owner': 'airflow',    
        'depends_on_past': False,    
        'email': ['email-address'],    
        'email_on_failure': True,    
        'email_on_retry': False,    
    },    
    dagrun_timeout=timedelta(hours=4),    
    start_date=days_ago(1),
    schedule_interval='0 * * * *',
    catchup=False,
    tags=['example'],
) as dag:

# EMR cluster creator 

    cluster_creator = EmrCreateJobFlowOperator(
        task_id='cluster_creator',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

# Adding step adder to run spark application

    step_adder_1 = EmrAddStepsOperator(
        task_id='step_adder_1',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='cluster_creator', key='return_value')}}",
        aws_conn_id='aws_default',
        steps=SPARK_APP,
        trigger_rule='all_done',
    )

# Adding step sensor to track the completion of step adder

    step_checker_1 = EmrStepSensor(
        task_id='step_checker_1',
        job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='step_adder_1', key='return_value')[0] }}",
       aws_conn_id='aws_default',
       trigger_rule='all_done',
    )

# Terminating EMR cluster if all task are completed which are running on top of cluster_remover task
    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='cluster_remover',
        job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}",
        aws_conn_id='aws_default',
        trigger_rule='all_done',
    )

# defining the order of task

cluster_creator >> step_adder_1 >> step_checker_1 >> cluster_remover
0

The best way to do this is probably to have a node at the root of your Airflow DAG that creates the EMR cluster, and then another node at the very end of the DAG that spins the cluster down after all of the other nodes have completed.

Max Gasner
  • 1,191
  • 1
  • 12
  • 18
0

Check my implementation, DAG will create emr cluster and run the spark job against the data in s3 and terminate automatically once done.

https://beyondexperiment.com/vijayravichandran06/aws-emr-orchestrate-with-airflow/

vijay
  • 49
  • 1
  • 2