4

I have Spark and Airflow cluster, I want to send a spark job from Airflow container to Spark container. But I am new about Airflow and I don't know which configuration I need to perform. I copied spark_submit_operator.py under the plugins folder.

from airflow import DAG

from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta

    args = {
        'owner': 'airflow',
        'start_date': datetime(2018, 7, 31)
    }
    dag = DAG('spark_example_new', default_args=args, schedule_interval="*/10 * * * *")

    operator = SparkSubmitOperator(
        task_id='spark_submit_job',
        conn_id='spark_default',
        java_class='Simple',
        application='/spark/abc.jar',
        total_executor_cores='1',
        executor_cores='1',
        executor_memory='2g',
        num_executors='1',
        name='airflow-spark-example',
        verbose=False,
        driver_memory='1g',
        application_args=["1000"],
        conf={'master':'spark://master:7077'},
        dag=dag,
    )

master is the hostname of our Spark Master container. When I run the dag, it produce following error:

[2018-09-20 05:57:46,637] {{models.py:1569}} INFO - Executing <Task(SparkSubmitOperator): spark_submit_job> on 2018-09-20T05:57:36.756154+00:00
[2018-09-20 05:57:46,637] {{base_task_runner.py:124}} INFO - Running: ['bash', '-c', 'airflow run spark_example_new spark_submit_job 2018-09-20T05:57:36.756154+00:00 --job_id 4 --raw -sd DAGS_FOLDER/firstJob.py --cfg_path /tmp/tmpn2hznb5_']
[2018-09-20 05:57:47,002] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,001] {{settings.py:174}} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-09-20 05:57:47,312] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,311] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2018-09-20 05:57:47,428] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,428] {{models.py:258}} INFO - Filling up the DagBag from /usr/local/airflow/dags/firstJob.py
[2018-09-20 05:57:47,447] {{base_task_runner.py:107}} INFO - Job 4: Subtask spark_submit_job [2018-09-20 05:57:47,447] {{cli.py:492}} INFO - Running <TaskInstance: spark_example_new.spark_submit_job 2018-09-20T05:57:36.756154+00:00 [running]> on host e6dd59dc595f
[2018-09-20 05:57:47,471] {{logging_mixin.py:95}} INFO - [2018-09-20 05:57:47,470] {{spark_submit_hook.py:283}} INFO - Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--conf', 'master=spark://master:7077', '--num-executors', '1', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'airflow-spark-example', '--class', 'Simple', '/spark/ugur.jar', '1000']

[2018-09-20 05:57:47,473] {{models.py:1736}} ERROR - [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1633, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/spark_submit_operator.py", line 168, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line 330, in submit
    **kwargs)
  File "/usr/local/lib/python3.6/subprocess.py", line 709, in __init__
    restore_signals, start_new_session)
  File "/usr/local/lib/python3.6/subprocess.py", line 1344, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'

It's running command:

Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--conf', 'master=spark://master:7077', '--num-executors', '1', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'airflow-spark-example', '--class', 'Simple', '/spark/ugur.jar', '1000']

but I didn't use yarn.

toydarian
  • 4,246
  • 5
  • 23
  • 35
ugur
  • 400
  • 6
  • 20

2 Answers2

2

If you use SparkSubmitOperator the connection to master will be set as "Yarn" by default regardless master which you set on your python code, however, you can override master by specifying conn_id through its constructor with the condition you have already created the aforementioned conn_id at "Admin->Connection" menu in Airflow Web Interface. I hope it can help you.

Andrzej Sydor
  • 1,373
  • 4
  • 13
  • 28
Rudy Antony
  • 21
  • 2
  • 5
  • I have done that step, I also connected the spark-master container to airflow_network. But when I trigger a jib from Airflow the spark master UI does not show the job running – moe_ Apr 04 '22 at 11:47
0

I guess you need to set master in extra options in your connections for this spark conn id (spark_default).By default is yarn, so try{"master":"your-con"} Also does your airflow user has spark-submit in the classpath?, from logs looks like it doesnt.

Tomasz Krol
  • 596
  • 6
  • 23
  • In my cluster, spark is running in a container separately from airflow container. Therefore, airflow container does not contain spark-submit. How can I add spark-submit which is found in another container to classpath of airflow container? – ugur Sep 24 '18 at 12:13
  • You have undone quite a convincingly improving edit to your post. Please explain your reason to @AndrzejSydor. – Yunnosch Dec 16 '20 at 21:10
  • Which one? @Yunnosch – Andrzej Sydor Dec 17 '20 at 08:14
  • 1
    @AndrzejSydor Sorry for confusing you. My comment addressed OP and only mentioned you. I used the ping syntax so that you are notified. I want OP to explain to you why they undid your decent edit and of course want them to improve the answer themselves. – Yunnosch Dec 17 '20 at 08:33