TL;DR How do I create a dynamic EmailOperator that sends a file from a filepath that is an XCom property
Hello Folks,
I'm using Apache Airflow 2.0.0.b2. My issue is that my DAG creates a file whose name changes at runtime. I'd like to email this file out but I'm having problems getting the dynamic file name into my EmailOperator.
Things I've Tried that Have Failed!:
Use templating for
files
property.files=["{{ ti.xcom_pull(key='OUTPUT_CSV') }}"],
Unfortunately, templating only works when the field in the operator is marked as such.
files
is not a templatable field on the EmailOperatorDynamically create my task using a function
def get_email_operator(?...): export_file_path = ti.xcom_pull(key='OUTPUT_CSV') email_subject = 'Some Subject' return EmailOperator( task_id="get_email_operator", to=['someemail@somedomain.net'], subject=email_subject, files=[export_file_path,], html_content='<br>', dag=current_dag) ..task3 >> get_email_operator() >> task4
Unfortunately, I can't seem to figure out how to pass the current
**kwargs
orti
information into my function call to get the current filepath.
EDIT: Elad's answer below set me in the right direction. The only thing I had to to do to make it work was to add kwargs
when calling op.execute()
Solution:
def get_email_operator(**kwargs):
export_file_path = kwargs['ti'].xcom_pull(key='OUTPUT_CSV')
email_subject = 'Termed Drivers - ' + date_string
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[export_file_path,],
html_content='<br>')
op.execute(kwargs)