3

I have both Airflow 2 (the official image) and Apache Spark running in a docker-compose pipeline.

I would like to execute a DAG triggering a Spark script by means of the SparkSubmitOperator (https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/spark_submit_operator/index.html), but I fail; in the Airflow web server, I can see the following log:

*** Reading local file: /opt/airflow/logs/timetable/spark-job/2021-05-16T07:18:57.288610+00:00/1.log
[2021-05-16 07:18:58,856] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [queued]>
[2021-05-16 07:18:58,906] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [queued]>
[2021-05-16 07:18:58,906] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-16 07:18:58,906] {taskinstance.py:1069} INFO - Starting attempt 1 of 4
[2021-05-16 07:18:58,906] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-16 07:18:58,926] {taskinstance.py:1089} INFO - Executing <Task(SparkSubmitOperator): spark-job> on 2021-05-16T07:18:57.288610+00:00
[2021-05-16 07:18:58,941] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'timetable', 'spark-job', '2021-05-16T07:18:57.288610+00:00', '--job-id', '164', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmp0opwomfp', '--error-file', '/tmp/tmpl4ctddqc']
[2021-05-16 07:18:58,935] {standard_task_runner.py:52} INFO - Started process 69 to run task
[2021-05-16 07:18:58,941] {standard_task_runner.py:77} INFO - Job 164: Subtask spark-job
[2021-05-16 07:18:59,000] {logging_mixin.py:104} INFO - Running <TaskInstance: timetable.spark-job 2021-05-16T07:18:57.288610+00:00 [running]> on host 94b160a4f0d4
[2021-05-16 07:18:59,053] {taskinstance.py:1283} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=timetable
AIRFLOW_CTX_TASK_ID=spark-job
AIRFLOW_CTX_EXECUTION_DATE=2021-05-16T07:18:57.288610+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-16T07:18:57.288610+00:00
[2021-05-16 07:18:59,055] {base.py:78} INFO - Using connection to: id: spark_default. Host: spark, Port: 8080, Schema: , Login: None, Password: None, extra: None
[2021-05-16 07:18:59,057] {spark_submit.py:364} INFO - Spark-Submit cmd: spark-submit --master spark:8080 --name arrow-spark spark-app.py
[2021-05-16 07:18:59,145] {spark_submit.py:526} INFO - JAVA_HOME is not set
[2021-05-16 07:18:59,156] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 183, in execute
    self._hook.submit(self._application)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/apache/spark/hooks/spark_submit.py", line 455, in submit
    self._mask_cmd(spark_submit_cmd), returncode
airflow.exceptions.AirflowException: Cannot execute: spark-submit --master spark:8080 --name arrow-spark spark-app.py. Error code is: 1.
[2021-05-16 07:18:59,159] {taskinstance.py:1532} INFO - Marking task as UP_FOR_RETRY. dag_id=timetable, task_id=spark-job, execution_date=20210516T071857, start_date=20210516T071858, end_date=20210516T071859
[2021-05-16 07:18:59,196] {local_task_job.py:146} INFO - Task exited with return code 1

As the line airflow.exceptions.AirflowException: Cannot execute: spark-submit --master spark:8080 --name arrow-spark spark-app.py. Error code is: 1. is not very precise, I don't know how to continue here. Somehow it seems the connection to Spark is not properly initialized. Thus my question:

How can I trigger a DAG with the SparkSubmitOperator when Spark is located in a different Docker container?

My setup:

daniel@Yoga:~/Projekte/db/airflow$ docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS                    PORTS                                                 NAMES
5e50523ee1ad   apache/airflow:2.0.2   "/usr/bin/dumb-init …"   23 minutes ago   Up 23 minutes             8080/tcp                                              airflowWorker
1da21c7545b3   apache/airflow:2.0.2   "/usr/bin/dumb-init …"   23 minutes ago   Up 23 minutes (healthy)   0.0.0.0:8081->8080/tcp, :::8081->8080/tcp             airflowWebserver
0fa61a4d0ce0   apache/airflow:2.0.2   "/usr/bin/dumb-init …"   23 minutes ago   Up 23 minutes (healthy)   0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 8080/tcp   airflowFlower
8a09bafae90a   apache/airflow:2.0.2   "/usr/bin/dumb-init …"   23 minutes ago   Up 23 minutes             8080/tcp                                              airflowScheduler
bbe5eb2111a7   postgres:13            "docker-entrypoint.s…"   23 minutes ago   Up 23 minutes (healthy)   5432/tcp                                              airflowPostgres
6b03d5411e52   redis:latest           "docker-entrypoint.s…"   23 minutes ago   Up 23 minutes (healthy)   0.0.0.0:6380->6379/tcp, :::6380->6379/tcp             airflowRedis
23457a819731   bitnami/spark:3        "/opt/bitnami/script…"   48 minutes ago   Up 48 minutes                                                                   spark_worker2
b2df035e216e   bitnami/spark:3        "/opt/bitnami/script…"   48 minutes ago   Up 48 minutes             0.0.0.0:8080->8080/tcp, :::8080->8080/tcp             spark
452d0cec2a0c   bitnami/spark:3        "/opt/bitnami/script…"   48 minutes ago   Up 48 minutes                                                                   spark_worker1

All containers are in the same Docker network, thus should be able to easily communicate.

My DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

default_args = {
    'owner': 'dw',
    'start_date': datetime(2021, 5, 9),
    "retries": 3,
    "retry_delay": timedelta(minutes = 1)
}

dag = DAG('timetable', description = 'spark test', catchup = False, schedule_interval = "@hourly", default_args = default_args)

s1 = SparkSubmitOperator(
    task_id = "spark-job",
    application = "spark-app.py",
    conn_id = "spark_default",
    dag = dag
)

As can be seen, the conn_id is set to spark_default. This connection is set in the Airflow server under Admin/Connections. I have set it as follows:

enter image description here

The script I want to execute by means of Spark is very simple (dummy script) and is located in the same folder where the DAG is defined:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

rdd = sc.parallelize(range(10))

rdd.count()

Several similar threads dealt with this problem, but I don't find a suitable solution there:

Requin
  • 467
  • 4
  • 16
  • HI @Requin, Did you link your airflow container to spark container using `--link spark` because you are directly giving the container name in the HOST variable `spark:8080`. It won't resolve unless you link both the containers. – sai May 16 '21 at 07:56
  • Hello @sai, no I did not specify this link option. In my understanding, if suffices to have both containers in the same network, so that they can found each other by their names: `By default Compose sets up a single network for your app. Each container for a service joins the default network and is both reachable by other containers on that network, and discoverable by them at a hostname identical to the container name.` (https://docs.docker.com/compose/networking/) – Requin May 16 '21 at 08:01
  • Given you are not sure about the cause of your error, you might want to make sure the container is able to find the job. I feel like the SparkSubmitOperator is unable to find your file. Also note that in Python the convention is to use snake-case, `spark-app.py` would be `spark_app.py`. I don't think that's the reason for the error but we shouldn't exclude anything while you are unsure of the cause. – Jorrick Sleijster May 16 '21 at 09:49
  • 2
    To continue on my previous comment. The SparkSubmitOperator opens a shell(terminal) where it runs the `spark-submit --master spark:8080 --name arrow-spark spark-app.py` command. You might want to try just running that from inside your airflow docker image to get it to work. – Jorrick Sleijster May 16 '21 at 09:57
  • Hello @JorrickSleijster, you are posing the right questions! You made me aware of the following: 1. I am not sure if the script spark_app.py is found. In the default airflow docker-compose (which I use), a shared volume is defined for the DAGs: ` volumes: - ./dags:/opt/airflow/dags`. spark_app.py is in the same folder as the DAGs, but will it be found? – Requin May 16 '21 at 10:51
  • 2. When I login into the Airflow worker container, I can execute spark-submit: `daniel@Yoga:~/Projekte$ docker exec -it airflowWorker bash` `default@94b160a4f0d4:/app$ cd /opt/airflow/dags/` `default@94b160a4f0d4:/opt/airflow/dags$ ls` `dag.py functions.py spark_app.py` `default@94b160a4f0d4:/opt/airflow/dags$ spark-submit --master spark:8080 --name arrow-spark spark-app.py` `JAVA_HOME is not set` – Requin May 16 '21 at 10:52
  • 1
    Thus, I get back `JAVA_HOME is not set` - can this be the cause of the whole thing not working? This also comes if I use a file which does not exist. Thus, at this point it does not even get to the file to be executed, but stops already before... I am also not sure _where_ it does not find Java: In the airflow or in the spark container? The airflow container does not seem to have Java installed, while the env variable JAVA_HOME is well set in the spark container. – Requin May 16 '21 at 11:04
  • 2
    I think that is the issue. Airflow container which has the spark-submit operator does not have java installed and no JAVA_HOME. – floating_hammer May 16 '21 at 16:06

1 Answers1

4

I finally managed to get it running by installing Java on the AirflowWorker container, as suggested by @floating_hammer.

See How to install java in an airflow container using docker-compose.yaml for details.

Requin
  • 467
  • 4
  • 16