-1

I am trying to create an aws lambda in python to launch an EMR cluster. Previously I was launching EMR using bash script and cron Tab. As my job run only daily so trying to move to lambda as invoking a Cluster is few second job.

I wrote below script to launch EMR. But getting exception of yarn support. What I am doing wrong here?

Exception

org.apache.spark.SparkException: Unable to load YARN support
    at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:405)
    at org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:400)
    at org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:400)
    at org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:425)
    at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:2387)
    at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:156)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:351)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:432)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
    at com.mmt.sp.hotelsttlengine.util.SparkConnectionFactory.createNewOrExistingSession(SparkConnectionFactory.java:32)
    at com.mmt.sp.hotelsttlengine.starter.GenerateUnifiedErrRefreshReport.run(GenerateUnifiedErrRefreshReport.java:67)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
    at com.mmt.sp.hotelsttlengine.starter.GenerateUnifiedErrRefreshReport.main(GenerateUnifiedErrRefreshReport.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
    at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:401)

Working Bash Script

#!/usr/bin/env bash
. ./config.sh
# Termination protection on
TERMINATION_PROTECTED=--no-termination-protected

# comment to turn auto terminate off. Putting by default true.
AUTO_TERMINATE=--auto-terminate



#Access key name. Please Change this as per the machine.
KEY_NAME=keyname

# S3 URI for logs
LOG_URI=s3://bucketpath/clusterlogs/

#source files path
S3_BUCKET_URL=s3://bucketpath/


# create cluster
json=$(aws emr create-cluster $TERMINATION_PROTECTED $AUTO_TERMINATE \
  --applications Name=Hadoop Name=Spark \
  --configurations file://spark-config.json \
  --release-label emr-5.8.0 \
  --ec2-attributes "{\"KeyName\":\"$KEY_NAME\",
                     \"SubnetId\":\"$SUBNET_ID\",
                     \"InstanceProfile\":\"EMR_EC2_DefaultRole\",
                     \"ServiceAccessSecurityGroup\":\"$SERVICE_ACCESS_SECURITY_GROUP\",
                     \"EmrManagedMasterSecurityGroup\":\"$EMR_MANAGED_MASTER_SECURITY_GROUP\",
                     \"EmrManagedSlaveSecurityGroup\":\"$EMR_MANAGED_SLAVE_SECURITY_GROUP\"}"\
  --service-role EMR_DefaultRole \
  --enable-debugging \
  --release-label emr-5.8.0 \
  --log-uri $LOG_URI \
  --instance-groups '[{"InstanceCount":1,
                       "InstanceGroupType":"MASTER",
                       "InstanceType":"m4.xlarge",
                       "Name":"Master instance group - 1"},
                      {"InstanceCount":6,
                       "InstanceGroupType":"CORE",
                       "InstanceType":"m4.2xlarge",
                       "Name":"Core instance group - 1"}]' \
 --name 'myjobname '\
  --region my-region\
  --steps "[
{\"Args\":[

{\"Args\":[
                \"--class\",\"com.test.MainClass\",
                     \"s3://bucketpath/jars/application/myapplication.jar\",
             \"prod\",\"$arg1\",\"$arg2\\",\"$arg3\",\"$arg4\"],
             \"Type\":\"SPARK\",
             \"ActionOnFailure\":\"$STEP_FAILURE_ACTION\",
             \"Properties\":\"\",
             \"Name\":\"mystep\"}
]"\
)


# check error; if none get cluster_id
if [ $? -ne 0 ]; then
  echo Failed to create cluster 1>&2
  exit $STATUS
fi
cluster_id=`echo $json | grep '"ClusterId":' | cut -d'"' -f4`


# wait for it to run
SECONDS=0
echo -n Waiting for cluster $cluster_id to run ...
aws emr wait cluster-running --cluster-id $cluster_id
duration=$SECONDS
echo CLUSTER-ID: $cluster_id
echo STATUS:UP
echo TIME TAKEN: $duration SECONDS

Python Script


import boto3

LOG_URI = "s3://log path/"


S3_BUCKET_URL = "s3://bucket path/"


def launchCluster(**kwargs):
    connection = boto3.client('emr', region_name='region_name')

    cluster_id = connection.run_job_flow(Name='myjob name', LogUri=LOG_URI,
                                         ReleaseLabel='emr-5.28.0',
                                         ServiceRole= 'EMR_DefaultRole',
                                         Applications=[
                                             {
                                                 'Name': 'Hadoop'
                                             },
                                             {
                                                 'Name': 'Spark'
                                             }

                                         ],

                                         Configurations=[{
                                             "Classification": "spark-defaults",
                                             "Properties": {
                                                 "spark.eventLog.dir": "s3a://evnlogpath/spark-history/",
                                                 "spark.history.fs.logDirectory": "s3a://eventlogpath/spark-history/",
                                                 "spark.eventLog.enabled": "true",
                                                 "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
                                                 "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
                                                 "spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs": "false",
                                                 "spark.hadoop.parquet.enable.summary-metadata": "false",
                                                 "spark.executor.memory": "15g",
                                                 "maximizeResourceAllocation": "true",
                                                 "spark.driver.memory": "14g",
                                                 "spark.driver.maxResultSize": "10g"

                                             }
                                         }, {
                                             "Classification": "spark-log4j",
                                             "Properties": {
                                                 "spark.log4j.rootCategory": "WARN",
                                                 "spark.log4j.appender.file.layout": "org.apache.log4j.EnhancedPatternLayout",
                                                 "spark.log4j.appender.file.layout.ConversionPattern": "%d{yy/MM/dd HH:mm:ss.SSS}{IST} %t %p %c{1}: %m%n"
                                             }
                                         }, {
                                             "Classification": "spark",
                                             "Properties": {
                                                 "maximizeResourceAllocation": "true"
                                             }
                                         }
                                         ],
                                         Instances={
                                             'InstanceGroups': [
                                                 {"InstanceCount": 1,
                                                  "InstanceRole": "MASTER",
                                                  "InstanceType": "m4.xlarge",
                                                  "Name": "Master instance group - 1"},
                                                 {"InstanceCount": 6,
                                                  "InstanceRole": "CORE",
                                                  "InstanceType": "m4.2xlarge",
                                                  "Name": "Core instance group - 1"}],

                                             "EmrManagedMasterSecurityGroup": "...",
                                             "EmrManagedSlaveSecurityGroup": "...",
                                             "ServiceAccessSecurityGroup": "...",
                                             "Ec2SubnetId": "...",
                                             # "InstanceRole":"MASTER",

                                             "Ec2KeyName": "...",
                                             "KeepJobFlowAliveWhenNoSteps": False,
                                             "TerminationProtected": False,
                                         },
                                         Steps=[

                                             {
                                                 "Name": "xz",
                                                 "ActionOnFailure": "TERMINATE_CLUSTER",
                                                 'HadoopJarStep': {
                                                     "Jar": "s3://bucket/jars/application/some-app.jar",
                                                     # "Properties": "",
                                                     "MainClass": "com.test.MainClass",
                                                     "Args": [...],
                                                                                                  }
                                             }

                                         ],
                                         VisibleToAllUsers=True,
                                         JobFlowRole='EMR_EC2_DefaultRole',
                                         Tags=[
                                             {
                                                 'Key': '..',
                                                 'Value': '..',
                                             }...
                                         ],
                                         )
    print("cluster is created:", str(cluster_id))
    return cluster_id


launchCluster()

Spark Yarn

sparks =  SparkSession.builder().master("yarn").appName(jobName)
                    .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
                    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
                    .config("spark.hadoop.parquet.enable.summary-metadata", "false")
                    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
                    .getOrCreate();
Gaurav
  • 139
  • 1
  • 16

2 Answers2

0

As I remember, hadoop jar step will use command runner to submit the fat jar to submit the application to yarn. So I would say remove the .master("yarn") and try to run. if error occurs, add it here in the post.

Abhi
  • 130
  • 11
0

Here is the step part that is used in my application with python boto3 to launch the EMR. You do not need to set the master as yarn in the spark source.

...
        Steps=[{
            'Name': 'Main',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit',
                         '--master', 'yarn', '--deploy-mode', 'client',
                         '--class', '<class name>',
                         's3://bucket/path/<jar name>.jar', '<argument>'
                         ]
            }
        }],
...
Lamanus
  • 12,898
  • 4
  • 21
  • 47