0

I saw some similar questions about it (like this and this) but none of them answer this quesiton.

I want to run some python file with BashOperator. Like this:

my_task = BashOperator(
        task_id='my_task',
        bash_command='python3 /opt/airflow/dags/programs/my_task.py',
    )

Is there a way I can call xcom_push and xcom_pull from my_task.py?

RafaelJan
  • 3,118
  • 1
  • 28
  • 46

2 Answers2

0

You can either modify it to PythonOperator or pass arguments to the script through bash command using Jinja syntax.

  1. PythonOperator

    from programs.my_task import my_function
    
    my_task = PythonOperator(
        task_id='my_task',
        python_callable=my_function,
    )
    

    my_task.py

    def my_function(**context):
        xcom_value = context['ti'].xcom_pull(task_ids='previous_task'))
    
        context['ti'].xcom_push(key='key', value='value') # this one is pushed to xcom
    
        return "xcom_push_value" # this value is also stored to xcom (xcom_push).
    
  2. Pass arguments to the python script

    my_task = BashOperator(
        task_id='my_task',
        bash_command='python3 /opt/airflow/dags/programs/my_task.py {{ ti.xcom_pull(task_ids="previous_task") }}'
    )
    

    my_task.py

    if __name__ == '__main__':
        xcom_pulled_value = sys.argv[1]
    
        print("xcom_push_value") # last line of stdout is stored to xcom.
    

    Alternatively, with this approach, you can use argparse.

Emma
  • 8,518
  • 1
  • 18
  • 35
  • thank you very much! But if I want to use the bashOperator, with your solution I can push only one value. I want to push multiple values. – RafaelJan Aug 04 '21 at 06:48
  • 1
    It depends on how much you want to push. If it is a few and the value is structured, you _could_ print in a csv style like `print("hello,world,123")` and in your successor operator split the csv. It is not that clean approach, though. – Emma Aug 04 '21 at 16:56
  • And if variables is more than just a few, you probably want to use either `PythonOperator` or custom intermediate data storage (ie file?). – Emma Aug 04 '21 at 17:25
  • Understood, So the answer is that we can't push more than once. But we can manually do some workaround. – RafaelJan Aug 04 '21 at 20:16
  • With `BashOperator`, yes, it is limited. With `PythonOperator` you can add multiple values. I'm not sure if you noticed, but I updated my answer to show the way to do `xcom_push` using context. That way, you can associate multiple values to unique keys. – Emma Aug 04 '21 at 20:19
0

If you need to use xcoms in a BashOperator and the desire is to pass the arguments to a python script from the xcoms, then I would suggest adding some argparse arguments to the python script then using named arguments and Jinja templating the bash_command. So something like this:

# Assuming you already xcom pushed the variable as "my_xcom_variable"

my_task = BashOperator(
        task_id='my_task',
        bash_command='python3 /opt/airflow/dags/programs/my_task.py --arg1={{ ti.xcom_pull(key="my_xcom_variable") }}',
    )

Then if you are unfamiliar with argparse you can add it at the end of the python script like so:

# ^^^ The rest of your program is up here ^^^
# I have no idea what your python script is,
# just assuming your main program is a function called main_program()
# add as many arguments as you want and name them whatever you want

if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser()
    
    parser.add_argument('--arg1')

    args = parser.parse_args()

    main_program(args_from_the_outside=args.arg1)
Roman Czerwinski
  • 527
  • 5
  • 16