5

I just began learning Airflow, but it is quite difficult to grasp the concept of Xcom. Therefore I wrote a dag like this:

from airflow import DAG
from airflow.utils.edgemodifier import Label

from datetime import datetime
from datetime import timedelta

from airflow.operators.bash import BashOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook

#For more default argument for a task (or creating templates), please check this website
#https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html#airflow.models.BaseOperator

default_args = {
    'owner': '...',
    'email': ['...'],
    'email_on_retry': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2021, 6, 10, 23, 0, 0, 0),
    
}

hook = SSHHook(
    remote_host='...',
    username='...',
    password='...## Heading ##',
    port=22,
)

with DAG(
    'test_dag',
    description='This is my first DAG to learn BASH operation, SSH connection, and transfer data among jobs',
    default_args=default_args,
    start_date=datetime(2021, 6, 10, 23, 0, 0, 0),
    schedule_interval="0 * * * *",
    tags = ['Testing', 'Tutorial'],
) as dag:
    # Declare Tasks
    Read_my_IP = BashOperator(
        # Task ID has to be the combination of alphanumeric chars, dashes, dots, and underscores 
        task_id='Read_my_IP',
        # The last line will be pushed to next task
        bash_command="hostname -i | awk '{print $1}'",
    )

    Read_remote_IP = SSHOperator(
        task_id='Read_remote_IP',
        ssh_hook=hook,
        environment={
            'Pi_IP': Read_my_IP.xcom_pull('Read_my_IP'),
        },
        command="echo {{Pi_IP}}",
    )

    # Declare Relationship between tasks
    Read_my_IP >> Label("PI's IP address") >> Read_remote_IP

The first task ran successfully, but I could not obtain the XCom return_value from task Read_my_IP, which is the IP address of the local machine. This might be very basic, but the documentation does not mention how to declare task_instance.

Please help to complete the Xcom flow and pass the IP address from the local machine to the remote machine for further procedure.

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
3LexW
  • 343
  • 2
  • 18

1 Answers1

4

The command parameter of SSHOperator is templated thus you can get the xcom directly:

Read_remote_IP = SSHOperator(
    task_id='Read_remote_IP',
    ssh_hook=hook,
    command="echo {{ ti.xcom_pull(task_ids='Read_my_IP') }}"
)

Note that you need also to explicitly ask for xcom to be pushed from BashOperator (see operator description):

Read_my_IP = BashOperator(
    # Task ID has to be the combination of alphanumeric chars, dashes, dots, and underscores 
    task_id='Read_my_IP',
    # The last line will be pushed to next task
    bash_command="hostname -i | awk '{print $1}'",
    do_xcom_push=True
)
Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • I would also like to ask if there is a way that I can assign the xcom_pull result to an environment variable and assign them into command since it might be nasty when variables increases. – 3LexW Jun 11 '21 at 10:11
  • @Elad, I have been facing the same issue with the airflow xcom . I am not able to pass my value from xompush to SQL – Aditya Verma Sep 01 '21 at 20:55
  • ti.xcom_push(key="QueryTimeStamp_{}".format(country), value=data) , This doesn't seem to work. when I am pulling the data using ti.xcom_pull(task_ids='Match_Updated_date_{}', key='QueryTimeStamp_{}')".format(country,country) – Aditya Verma Sep 01 '21 at 20:56