I am new to airflow and I am trying to schedule a pyspark job in airflow deployed in docker containers, here is my dag,
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
spark_master = "spark://spark:7077"
spark_app_name = "Spark Hello World"
now = datetime.now()
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(now.year, now.month, now.day),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
}
dag = DAG(
dag_id="spark-test",
description="This DAG runs a simple Pyspark app.",
default_args=default_args,
schedule_interval=timedelta(1)
)
t1 = DummyOperator(task_id="start", dag=dag)
#Task 2 check if file exist
t2 = BashOperator(task_id='check_file_exists', bash_command='shasum
/usr/local/spark/app/first.py',retries=2, retry_delay=timedelta(seconds=15),dag=dag)
t3 = SparkSubmitOperator(task_id="spark_job", application='/usr/local/spark/app/first.py',
name=spark_app_name,
conn_id="spark_default",
conf={"spark.master":spark_master},
dag=dag)
t1 >> t2 >> t3
My python script is: first.py
from pyspark import SparkContext, SparkConf
if __name__ == '__main__':
conf = SparkConf().setAppName("app")
sc = SparkContext(conf=conf)
text_file = sc.textFile("/usr/local/spark/resources/data/Loren.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("/usr/local/spark/resources/data/loren_counts_task4")
The error I'm getting FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'
Reading local file: /usr/local/airflow/logs/spark-test/spark_job/2021-07-
09T20:46:19.130980+00:00/2.log
[2021-07-09 20:47:50,119] {{taskinstance.py:655}} INFO - Dependencies all met for
<TaskInstance: spark-test.spark_job 2021-07-09T20:46:19.130980+00:00 [queued]>
[2021-07-09 20:47:50,151] {{taskinstance.py:655}} INFO - Dependencies all met for
<TaskInstance: spark-test.spark_job 2021-07-09T20:46:19.130980+00:00 [queued]>
[2021-07-09 20:47:50,152] {{taskinstance.py:866}} INFO -
--------------------------------------------------------------------------------
[2021-07-09 20:47:50,152] {{taskinstance.py:867}} INFO - Starting attempt 2 of 2
[2021-07-09 20:47:50,152] {{taskinstance.py:868}} INFO -
--------------------------------------------------------------------------------
[2021-07-09 20:47:50,165] {{taskinstance.py:887}} INFO - Executing <Task(SparkSubmitOperator):
spark_job> on 2021-07-09T20:46:19.130980+00:00
[2021-07-09 20:47:50,169] {{standard_task_runner.py:53}} INFO - Started process 19335 to run
task
[2021-07-09 20:47:50,249] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance:
spark-test.spark_job 2021-07-09T20:46:19.130980+00:00 [running]> 9b6d4f74ee93
[2021-07-09 20:47:50,293] {{logging_mixin.py:112}} INFO - [2021-07-09 20:47:50,292]
{{base_hook.py:84}} INFO - Using connection to: id: spark_default. Host: yarn, Port: None,
Schema: None, Login: None, Password: None, extra: XXXXXXXX
[2021-07-09 20:47:50,294] {{logging_mixin.py:112}} INFO - [2021-07-09 20:47:50,294]
{{spark_submit_hook.py:323}} INFO - Spark-Submit cmd: spark-submit --master yarn --conf
spark.master=spark://spark:7077 --name Spark Hello World --queue root.default
usr/local/spark/app/first.py
[2021-07-09 20:47:50,301] {{taskinstance.py:1128}} ERROR - [Errno 2] No such file or
directory: 'spark-submit': 'spark-submit'
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in
_run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-
packages/airflow/contrib/operators/spark_submit_operator.py", line 187, in execute
self._hook.submit(self._application)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line
393, in submit
**kwargs)
File "/usr/local/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/usr/local/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit': 'spark-submit'
[2021-07-09 20:47:50,304] {{taskinstance.py:1170}} INFO - All retries failed; marking task as
FAILED.dag_id=spark-test, task_id=spark_job, execution_date=20210709T204619,
start_date=20210709T204750, end_date=20210709T204750
[2021-07-09 20:48:00,096] {{logging_mixin.py:112}} INFO - [2021-07-09 20:48:00,095]
{{local_task_job.py:103}} INFO - Task exited with return code 1
I ran the spark-submit on the spark container and it works perfectly. i am not sure what is wrong