86

I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.

I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'

This is what my pipeline looks like(very simplified version):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime
                                  
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal()) Instead what I want here is 'execution_date'

How do I use 'execution_date' directly and assign it to a variable in my python file?

I have having this general issue of accessing args. Any help will be genuinely appreciated.

Thanks

Cirelli94
  • 1,714
  • 1
  • 15
  • 24
Roger
  • 2,823
  • 3
  • 25
  • 32

7 Answers7

59

The BashOperator's bash_command argument is a template. You can access execution_date in any template as a datetime object using the execution_date variable. In the template, you can use any jinja2 methods to manipulate it.

Using the following as your BashOperator bash_command string:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

If you just want the string equivalent of the execution date, ds will return a datestamp (YYYY-MM-DD), ds_nodash returns same without dashes (YYYYMMDD), etc. More on macros is available in the Api Docs.


Your final operator would look like:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
Raj
  • 2,368
  • 6
  • 34
  • 52
Erik Schuchmann
  • 606
  • 5
  • 5
  • 3
    This is the correct answer. I would just edit it to show a completed version of the task, e.g. `t1 = BashOperator( task_id='rest-api-1', bash_command='curl -XPOST "'+hostname+':8000/run?st={{ execution_date }}"', dag=dag)` – Davos Dec 11 '17 at 03:43
  • 3
    wanted to update this with `Python3` fstrings info, `command =f """..."""` does not seem to work. To get the `jinja2` templating working I think you must not use **`fstrings`** – cryanbhu Aug 26 '19 at 04:56
  • If my dag is running hourly, and currently my dag is having execution date of 2021-06-03 08:00:00, I access this date using {{execution_date}} it returns fine, but my question is now current time is 09:00:00 then the execution date will change for my dag when I will access it or it will remain same throughout. – Arpit Pruthi Jun 03 '21 at 07:30
  • @ArpitPruthi Hey Arpit, it will remain the same. 'logical_date' or as it was called in previous Airflow versions 'execution_date' denotes the start of the period assigned to your DAG. For instance if you have a DAG that runs hourly starting at 9 am, the first DAG has a period of (9:00 - 9:59), second has a period of (10:00 - 10:59) etc etc. Your first dag will have a 'logical_date' or 'execution_date' of 9 am. This will remain the same regardless of when you read it in your dag. More on that here: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval – Diana Vazquez Romo Aug 11 '22 at 17:30
49

The PythonOperator constructor takes a 'provide_context' parameter (see https://pythonhosted.org/airflow/code.html). If it's True, then it passes a number of parameters into the python_callable via kwargs. kwargs['execution_date'] is what you want, I believe.

Something like this:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)

I'm not sure how to do it with the BashOperator, but you might start with this issue: https://github.com/airbnb/airflow/issues/775

Ziggy Eunicien
  • 2,858
  • 1
  • 23
  • 28
  • 4
    Thanks. With that approach, I will have a task t1, which will be an instance of PythonOperator with provide_context=true, which lets me use kwargs['execution_date'] where I will set and return current_datetime = 'execution_date' . Then I create my task t2: BashOperator: in which I will pull (using XCOM) and use my variables. So you see, I have to create 2 tasks. which is kinda not sexy ;) I am sure (and I hope I am right) there is a way I can access 'execution_date' directly in the python code without using PythonOperator. But I am not able to figure out how to do it :( – Roger Apr 20 '16 at 23:34
  • You could just use the native subprocess library for Python to run a bash command from within a Python function/operator. https://docs.python.org/3/library/subprocess.html – Chris Ivan Jun 24 '21 at 17:37
35

I think you can't assign variables with values from the airflow context outside of a task instance, they are only available at run-time. Basically there are 2 different steps when a dag is loaded and executed in airflow :

  • First your dag file is interpreted and parsed. It has to work and compile and the task definitions must be correct (no syntax error or anything). During this step, if you make function calls to fill some values, these functions won't be able to access airflow context (the execution date for example, even more if you're doing some backfilling).

  • The second step is the execution of the dag. It's only during this second step that the variables provided by airflow (execution_date, ds, etc...) are available as they are related to an execution of the dag.

So you can't initialize global variables using the Airflow context, however, Airflow gives you multiple mechanisms to achieve the same effect :

  1. Using jinja template in your command (it can be in a string in the code or in a file, both will be processed). You have the list of available templates here : https://airflow.apache.org/macros.html#default-variables. Note that some functions are also available, particularly for computing days delta and date formatting.

  2. Using a PythonOperator in which you pass the context (with the provide_context argument). This will allow you to access the same template with the syntax kwargs['<variable_name']. If you need so, you can return a value from a PythonOperator, this one will be stored in an XCOM variable you can use later in any template. Access to XCOM variables use this syntax : https://airflow.apache.org/concepts.html#xcoms

  3. If you write your own operator, you can access airflow variables with the dict context.

Kedar Rudre
  • 134
  • 7
Babcool
  • 2,103
  • 1
  • 17
  • 15
  • 3
    There's technically 3 ways to do it as pointed out in other questions above. Using jinja template, using kwargs in a python_callable, or using context['execution_date'] in an operator. Probably best to remove this answer completely, or at least delete most of it. – Davos Dec 11 '17 at 03:49
  • 1
    Thanks for the heads up, I learned a lot about airflow since I wrote this answer, I edited it to make it more right and precise! – Babcool Dec 12 '17 at 05:01
  • 1
    I made some minor edits to make your first summary statement consistent with the 2 points below. I think this answer is now correct, although you could add more code examples for extra points. – Davos Dec 12 '17 at 05:21
  • 1
    This is the most correct answer - the question asks "How do I use 'execution_date' directly and assign it to a variable in my python file?" - given that there is no reference to a python file (not using python operator) the (correct) assumption is that it was needed in the DAG, which you cannot do, which this answer says. – Simon D Apr 12 '18 at 09:35
  • I think this answer should be accepted. It explains both how things are and why they are that way. – Gregory Arenius Aug 15 '18 at 18:46
23
def execute(self, context):
    execution_date = context.get("execution_date")

This should be inside the execute() method of Operator

l0n3r4n83r
  • 1,271
  • 1
  • 14
  • 25
  • 3
    This is likely what you want if you are building a custom operator. – c-a Nov 29 '17 at 17:29
  • I believe it can be used in [`pre_execute`](https://github.com/apache/airflow/blob/v1-10-stable/airflow/models.py#L2802) / [`post_execute`](https://github.com/apache/airflow/blob/v1-10-stable/airflow/models.py#L2818) methods too – y2k-shubham Feb 15 '19 at 11:20
  • I've managed to get the execution date using either `context.get("execution_date")` or `kwargs["execution_date"]`, thanks! – blackraven Aug 04 '22 at 04:54
9

To print execution date inside the callable function of your PythonOperator you can use the following in your Airflow Script and also can add start_time and end_time as follows:

def python_func(**kwargs):
    execution_date = kwargs["execution_date"]    #<datetime> type with timezone
    end_time = str(execution_date)
    start_time = str(execution_date.add(minutes=-30))

I have converted the datetime value to string as I need to pass it in a SQL Query. We can use it otherwise also.

blackraven
  • 5,284
  • 7
  • 19
  • 45
  • I've managed to get the execution date using either `context.get("execution_date")` or `kwargs["execution_date"]`, thanks! – blackraven Aug 04 '22 at 04:54
0

You may consider SimpleHttpOperator https://airflow.apache.org/_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator. It’s so simple for making http request. you can pass execution_date with endpoint parameter via template.

gigkokman
  • 61
  • 7
-1

Here's another way without context. using the dag's last execution time can be very helpful in scheduled ETL jobs. Such as a dag that 'downloads all newly added files'. Instead of hardcoding a datetime.datetime, use the dag's last execution date as your time filter.

Airflow Dags actually have a class called DagRun that can be accessed like so: dag_runs = DagRun.find(dag_id=dag_id)

Here's an easy way to get the most recent run's execution time:

def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[1] if len(dag_runs) > 1 else None

Then, within your pythonOperator, you can dynamically access the dag's last execution by calling the function you created above:

last_execution = get_most_recent_dag_run('dag')

Now its a variable!

KevinG
  • 109
  • 2
  • 5