7

I am new to airflow .In my company for ETL pipeline currently we are using Crontab and custom Scheduler(developed in-house) .Now we are planning to implement apache airflow for our all Data Pipe-line scenarios .For that while exploring the features not able to find unique_id for each Task Instances/Dag .When I searched most of the solutions ended up in macros and template .But none of them are not providing a uniqueID for a task .But I am able to see incremental uniqueID in the UI for each tasks .Is there any way to easily access those variables inside my python method .The main use case is I need to pass those ID's as a parameter to out Python/ruby/Pentaho jobs which is called as scripts/Methods .

For Example

my shell script 'test.sh ' need two arguments one is run_id and other is collection_id. Currently we are generating this unique run_id from a centralised Database and passing it to the jobs .If it is already present in the airflow context we are going to use that

from airflow.operators.bash_operator import BashOperator
from datetime import date, datetime, timedelta
from airflow import DAG

shell_command =  "/data2/test.sh -r run_id -c collection_id"


putfiles_s3 = BashOperator(
                task_id='putfiles_s3',
                bash_command=shell_command,
                dag=dag)

Looking for a unique run_id(Either Dag level/task level) for each run while executing this Dag(scheduled/manual)

Note: This is a sample task .There will be multiple dependant task to this Dag . Attaching Job_Id screenshot from airflow UI enter image description here

Thanks Anoop R

Anoop R
  • 545
  • 3
  • 8
  • 19
  • include your code – Daniel Lee Sep 15 '17 at 14:54
  • Have you looked at UUID? https://stackoverflow.com/questions/534839/how-to-create-a-guid-uuid-in-python#534851 – Micah Elliott Sep 15 '17 at 15:42
  • @MicahElliott Thanks for your suggestion .We can generate random id like this or from shell random command .I was looking for some id generated by airflow itself just like job_id .Attaching a screenshot of Airflow UI for reference. – Anoop R Sep 15 '17 at 18:34

1 Answers1

7

{{ ti.job_id }} is what you want:

from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow import DAG


dag = DAG(
    "job_id",
    start_date=datetime(2018, 1, 1),
)

with dag:
    BashOperator(
        task_id='unique_id',
        bash_command="echo {{ ti.job_id }}",
    )

This will be valid at runtime. A log from this execution looks like:

[2018-01-03 10:28:37,523] {bash_operator.py:80} INFO - Temporary script location: /tmp/airflowtmpcj0omuts//tmp/airflowtmpcj0omuts/unique_iddq7kw0yj  
[2018-01-03 10:28:37,524] {bash_operator.py:88} INFO - Running command: echo 4
[2018-01-03 10:28:37,621] {bash_operator.py:97} INFO - Output:
[2018-01-03 10:28:37,648] {bash_operator.py:101} INFO - 4

Note that this will only be valid at runtime, so the "Rendered Template" view in the webui will show None instead of a number.

Ash Berlin-Taylor
  • 3,879
  • 29
  • 34
  • {{ ti.job_id }} I can use with any operators and can pass as arguments to Python method also right?If you don't mind Can you show me an example to pass this value to a python method Thanks Ash Berlin-Taylor. – Anoop R Jan 03 '18 at 11:13
  • I got the solution for passing the same to python method def test_failure(**kwargs): print ' Instance variables accessing from context ' ti=kwargs['ti'] print ti.job_id @Ash Is there any documents which is giving what all are values available through "task_instance".This url is not explaining much about "ti" https://pythonhosted.org/airflow/code.html#macros – Anoop R Jan 05 '18 at 12:02
  • Not right now, no. "ti" is a TaskInstance https://pythonhosted.org/airflow/code.html#airflow.models.TaskInstance but that doesn't document any of the properties of that object, so I go to the code. – Ash Berlin-Taylor Jan 05 '18 at 15:42