9

I have the following DAG with two SSHExecuteOperator tasks. The first task executes a stored procedure which returns a parameter. The second task needs this parameter as an input.

Could please explain how to pull the value from the XCom pushed in task1, in order to use it in task2?

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.models import Variable

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['my@email.com'],
  'email_on_failure': True,
  'retries': 0
}

#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin
DataQualitySSHHook = Variable.get('DataQualitySSHHook')
print('Connecting to: ' + DataQualitySSHHook)
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook)
sshHookEtl.no_host_key_check = True 

#create dag
dag = DAG(
  'ed_data_quality_test-v0.0.3', #update version whenever you change something
  default_args=default_args,
  schedule_interval="0 0 * * *",
  dagrun_timeout=timedelta(hours=24),
  max_active_runs=1)

#create tasks
task1 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_batch_register',
  bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end
  ssh_hook=sshHookEtl,
  xcom_push=True,
  retries=0,
  dag=dag)

task2 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_module_session_start',
  bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}",
  ssh_hook=sshHookEtl,
  retries=0,
  dag=dag)

#create dependencies
task1.set_downstream(task2)
Alexis.Rolland
  • 5,724
  • 6
  • 50
  • 77

2 Answers2

13

So the solution I have found is when task1 executes the shell script, you have to make sure the parameter you want to be captured by the XCom variable is the last thing printed by your script (using echo).

Then I was able to retrieve the XCom variable value with the following code snippet:

{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}

Alexis.Rolland
  • 5,724
  • 6
  • 50
  • 77
  • Do you have an example where inside the bash command you perform other operations after retrieving the XCom? I would like to see an example, as I have been trying and having been failing. – Tsume Aug 03 '18 at 17:53
  • @Tsume here is an example of task where I grab the XCom variable from a previous task: ```my_task= SSHExecuteOperator( task_id='my_task', bash_command="bash /opt/scripts/MY_SHELL_SCRIPT.sh 'Argument_1' {{ task_instance.xcom_pull(task_ids='my_previous_task') }} ", ssh_hook=sshHookEtl, retries=3, dag=dag)``` for some reason I had to keep the space at the end of the `bash_command` otherwise the task fails – Alexis.Rolland Aug 04 '18 at 07:51
  • 1
    @Tsume You will notice the XCom is passed as the second argument. And here is what the Shell script is doing to capture it: ```echo "Get process name and batch key and source" in_P_PROCESS=$1 in_P_BATCH_KEY=$2``` You have to put carriage return after each command above but stack comments do not allow me to do that in code snippets. – Alexis.Rolland Aug 04 '18 at 07:52
  • Thanks for the help, it actually helped pinpoint the issue. With my bash script I was passing in parameters with the .format() python function which was caused the script not to work. I overcame this by splitting the script into two parts, one containing the xcom command and the rest with what required from the bash script. For example "input=$(echo {{ task_instance.xcom_pull(task_ids='my_previous_task') }} ) && " + " echo $input {0}".format(variable_input)". Simply by concatenating the two strings resolved my issue. – Tsume Aug 08 '18 at 12:22
  • Glad it helped! – Alexis.Rolland Aug 08 '18 at 17:08
3

insted of xcom_push=True , try do_xcom_push=True, It will bring all the stdout to the xcom with key return_value

Milan
  • 51
  • 4