Am new to spark and airflow, trying to understand how I can use airflow to kick off a job along with parameters needed for the job. I use the below spark-submit command to run a specific job for specific dates in edge node as below,
EXECUTORS_MEM=4G
EXECUTORS_NUM=300
STARTDAY=20180401
ENDDAY=20180401
QUEUE=m
jobname=x
/home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client $EXECUTORS_NUM --executor-memory $EXECUTORS_MEM --executor-cores 1 --driver-memory 8G --queue $QUEUE --class test.core.Driver --jars $JARS2 abc.jar --config=/a/b/c/test.config --appName=abc --sparkMaster=yarnclient --job=$jobname --days=$STARTDAY,$ENDDAY
So can you please let me know if I create .py something similar to the code below to run the job in airflow ? Is this how your supposed to run a job & pass parameters ?
How do I pass parameter like I did for launching a job in edge node ?
If I automate the job to run daily I would like the start date to be "t-7", so if today's date is 4/20/2018 the start date to the job has to be 4/13/2018. How do I achieve it ?
###############.py file example ##############
**********************************************
import BashOperator
import os
import sys
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
import os
import sys
os.environ['SPARK_HOME'] = '/home/spark/spark-2.1.0-bin-hadoop2.6/bin/'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
and add operator:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class test.core.Driver abc.jar',
params={'EXECUTORS_MEM': '4G', 'EXECUTORS_NUM': '300', 'QUEUE' :'m' , 'jobname' : 'x'},
dag=dag
)
################### EOF ######################
**********************************************
New .py file - please correct me if anything is wrong
- How do I pass the params to Run a spark version which in different path ?
- Pass a jar which in different path
- Is it the right way to pass the parameters like shown below?
Is it possible to pass certain start & end date manually for the job to run?
from airflow import DAG from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator from airflow.utils import timezone DEFAULT_DATE = timezone.datetime(2017, 1, 1) args = { 'owner': 'airflow', 'start_date': DEFAULT_DATE } dag = DAG('test_dag_id', default_args=args) _config = { 'config' : '/a/b/c/d/prod.config' 'master' : 'yarn' 'deploy-mode' : 'client' 'sparkMaster' : 'yarnclient' 'class' : 'core.Driver' 'driver_classpath': 'parquet.jar', 'jars': '/a/b/c/d/test.jar', 'total_executor_cores': 4, 'executor_cores': 1, 'EXECUTORS_MEM': '8G', 'EXECUTORS_NUM': 500, 'executor-cores' : '1', 'driver-memory' : '8G', 'JOB_NAME' : ' ', 'QUEUE' : ' ', 'verbose' : ' ' 'start_date' : ' ' 'end_date' : ' ' ] } operator = SparkSubmitOperator( task_id='spark_submit_job', dag=dag, **_config )