0

I wrote a very simple airflow dag as follows:

import airflow                                                                                                                                                
from airflow import DAG                                                                                                                                        from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator                                                                                     
from datetime import timedelta                                                                                                                                 from datetime import datetime as dt                                                                                                                           

default_args = {                                                                                                                                              
             'owner': 'zxy',                                                                                                                                       'depends_on_past': False,                                                                                                                             
             'email_on_failure': True,                                                                                                                                      'email_on_retry': True,                                                                                                                               
             'retries': 1,                                                                                                                                         
             'retry_delay': timedelta(minutes=5)                                                                                                                   
             }                                                                                                                                                     

dag = DAG(                                                                                                                                                    
             'my_first_dag',                                                                                                                                      
             default_args=default_args,                                                                                                                            
             #start_date=dt.strptime('2018-05-16', '%Y-%m-%d'),                                                                                                    
             start_date=airflow.utils.dates.days_ago(2),                                                                                                           
             description='My First Airflow DAG',                                                                                                               
             schedule_interval=timedelta(minutes=5))                                                                                                               

sql = r'''select count(u) from some_table where time=20180513 and platform='iOS' '''                                                    
t1 = SparkSqlOperator(task_id='Count_Ads_U', conn_id='spark_default',sql=sql, dag=dag)  

Then I ran airflow scheduler to schedule the job.

The job gave the right number successfully, but the job kept printing blank logs as follows hence cannot stop sucessfully:

[2018-05-16 06:33:07,505] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,505] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO spark. SparkContext: Successfully stopped SparkContext\n'
[2018-05-16 06:33:07,506] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,506] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO util. ShutdownHookManager: Shutdown hook called\n'
[2018-05-16 06:33:07,506] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,506] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO util. ShutdownHookManager: Deleting directory /tmp/spark-fbb4089c-338b-4b0e-a394-975f45b307a8\n'
[2018-05-16 06:33:07,509] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,509] {spark_sql_hook.py:142} INFO - b'18/05/16 06:33:07 INFO util. ShutdownHookManager: Deleting directory /apps/data/spark/temp/spark-f6b6695f-24e4-4db0-ae2b-29b6836ab9c3\n'
[2018-05-16 06:33:07,902] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,902] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,903] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''
[2018-05-16 06:33:07,904] {base_task_runner.py:98} INFO - Subtask: [2018-05-16 06:33:07,903] {spark_sql_hook.py:142} INFO - b''

The empty log went on endlessly until I stopped the scheduler by Ctr+C.

airflow version is v1.9.0.

Derrick Zhang
  • 21,201
  • 18
  • 53
  • 73
  • @tobi6 Indeed the subprocess seems to keep running, but I fixed it in a different way and you check it out in my own answer below, although I still don't know why the subprocess didn't stop after successful execution.. – Derrick Zhang May 16 '18 at 10:06

1 Answers1

2

Problem solved.

This is caused by a byte literal vs string literal problem if your are using Python 3.x(line 146 of https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py):

for line in iter(self._sp.stdout.readline, ''):
    self.log.info(line)

The sentinel used in the iter is '', which is the empty string literal. But the actual content in the stdout are byte literals instead of string literals(see this post for reference: What does the 'b' character do in front of a string literal?), as can tell from the b prefix in each line of log, so the for loop never ends for some reason.

I fixed the problem by replacing '' with b''.

Derrick Zhang
  • 21,201
  • 18
  • 53
  • 73