3

I need to retrieve the output of a bash command (which will be the size of a file), in a SSHOperator. I will use this value as a condition check to branch out to other tasks. I'm using xcom to try retrieving the value and branchpythonoperator to handle the decision but I've been quite unsuccessful. The SSHOperator doesn't seem to get value into the xcom.

The following is my code:

#Required packages to execute DAG

from __future__ import print_function
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

log = logging.getLogger(__name__)

def decision_function(**context):
    ti = context['ti']
    fileSize = ti.xcom_pull(task_ids='get_param')
    log.info('File size is: {}'.format(fileSize))
    if fileSize >= 800000:
        return 'good_path'
    else:
        return 'bad_path'

# DAG parameters

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 17, 4, 15),
    'dagrun_timeout': None,
    'email': Variable.get('email'),
    'email_on_failure': False,
    'email_on_retry': False,
    'provide_context': True,
    'orientation': 'LR'#, TB, RL, BT)

}

# create DAG object with Name and default_args
with DAG(
    'a_param',
    schedule_interval=None,
    description='params',
    default_args=default_args
    ) as dag:

    # Define tasks
    begin = DummyOperator(
        task_id='begin',
        dag=dag
    )

    get_param = SSHOperator(
    ssh_conn_id="oci_connection",
    task_id='get_param',
    xcom_push=True,
    command="ls -ltr /tmp/adobegc.log | awk '{print $5}'",
    dag=dag)

    check_file = BranchPythonOperator(
    task_id='check_file',
    python_callable=decision_function,
    provide_context=True,
    dag=dag)

    good_path = DummyOperator(
        task_id='good_path',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    bad_path = DummyOperator(
        task_id='bad_path',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    begin >> get_param >> check_file
    check_file >> good_path
    check_file >> bad_path

check_fail task fails with the following log:

[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file /usr/local/lib/airflow/airflow/contrib/operators/ssh_operator.py:75: PendingDeprecationWarning: Invalid arguments were passed to SSHOperator (task_id: get_param). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file *args: ()
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file **kwargs: {'xcom_push': True}
[2020-10-12 16:58:20,573] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file   super(SSHOperator, self).__init__(*args, **kwargs)
[2020-10-12 16:58:20,906] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file Running <TaskInstance: a_param.check_file 2020-10-12T16:55:18.312240+00:00 [running]> on host airflow-worker-9b6fbd84c-l4jbs
[2020-10-12 16:58:20,990] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file [2020-10-12 16:58:20,989] {a_param.py:25} INFO - File size is: None
[2020-10-12 16:58:20,990] {taskinstance.py:1135} ERROR - '>=' not supported between instances of 'NoneType' and 'int'
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 141, in execut
    branch = super(BranchPythonOperator, self).execute(context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execut
    return_value = self.execute_callable(
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callabl
    return self.python_callable(*self.op_args, **self.op_kwargs
  File "/home/airflow/gcs/dags/a_param.py", line 26, in decision_functio
    if fileSize >= 800000
TypeError: '>=' not supported between instances of 'NoneType' and 'int

get_param task log snippet

[2020-10-12 16:57:12,113] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,112] {ssh_operator.py:109} INFO - Running command: ls -ltr /tmp/adobegc.log | awk '{print $5}'
[2020-10-12 16:57:12,549] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,547] {ssh_operator.py:143} INFO - 516752
comet
  • 65
  • 2
  • 8
  • 1
    Have a look in [this question](https://stackoverflow.com/questions/42974051/how-to-retrieve-a-value-from-airflow-xcom-pushed-via-sshexecuteoperator), could be helpful. – Philipp Johannis Oct 13 '20 at 06:32
  • @PhilippJohannis thanks for this, I changed `xcom_push` argument in my SSHOperator to `do_xcom_push`. Checking the xcom page, I'm not getting the expected result. What I'm getting is `key: return_value ; Value:ODAwMAo=`. I'm expecting the file size under Value. – comet Oct 14 '20 at 09:41
  • 1
    Value seems to be base64 encoded, you can test it [here](https://www.base64decode.org/) - it would be `8000`. – Philipp Johannis Oct 14 '20 at 10:33
  • @PhilippJohannis yea I see. Why does it return it base64 encoded? Is there a way to make Airflow return string value or int value? – comet Oct 14 '20 at 12:58
  • 2
    Actually the Operator is written like that: `return b64encode(agg_stdout).decode('utf-8')` - [source](https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/ssh_operator.html) I guess you can just decode it again once you read it the value agian. – Philipp Johannis Oct 14 '20 at 13:21
  • 1
    Yes that's what I ended up doing. Thanks for your help. For those who want to know this is what I did: `int(base64.b64decode(fileSize).decode('utf-8'))` – comet Oct 14 '20 at 15:36

0 Answers0