9

I'm trying to call a python operator which is inside a function using another python operator. Seems something I missed, can someone help me to find out what I missed.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago


dd = datetime(2018, 1, 1)
args = {
    'owner': 'airflow',
    'start_date': dd,
    'retries': 0

}

def postgres_to_gcs():
    t1 = BashOperator(
    task_id='count_lines',
    bash_command='echo "task1"',
    xcom_push=True,
    dag=dag)
    return t1



with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
    python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
 
    python_task

Error:

[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
  File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
    dag=dag)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
    self.dag = dag
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
    dag.add_task(self)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
    task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410

One workaround suggested by Racooneer (but still the issue is there)

Thanks, Racooneer!!!

Removing default_args helped to solve it, but not able to see bash command output

TheDataGuy
  • 2,712
  • 6
  • 37
  • 89
  • may I offer a different approach, since I think what you try to do is not meant to be: you could use the subprocess library from python ```import subprocess``` and do somthing like this ```subprocess.run('echo "wwwwwwwwwwwwwww"', shell=True, check=True)```... I'm pretty sure you have to look at the docs to figure out your intended result (https://docs.python.org/3/library/subprocess.html). I guess you want to start a script like this and not just echo, since the only place this echo would be shown should be the airflow log – Racooneer Oct 10 '20 at 08:00
  • Its a good one, but im trying to implement a dynamic dag (https://towardsdatascience.com/creating-a-dynamic-dag-using-apache-airflow-a7a6f3c434f3) So this is the base dag im trying – TheDataGuy Oct 10 '20 at 08:49
  • cool idea... I checked the docs of the ```BashOperator``` and I think the ```default_args=args``` is not part of the operator and is, hence, creating your troubles. In the example from towardsdatascience, he uses a ```PythonOperator()``` which usually makes some trouble if you don't provide some args argument (that may not even do anything). But the ```BashOperator()``` does not seem to need them. If you need to provide more parameters to the bash script, there should be some Jinja templates for bash commands you handle with a ```params``` parameter (sorry, I have no link to this) – Racooneer Oct 10 '20 at 09:25
  • I tried removing that, but still not working – TheDataGuy Oct 10 '20 at 09:35
  • And updated the question with the new error – TheDataGuy Oct 10 '20 at 09:36
  • Oh just a min, I changed that, and no errors also no output – TheDataGuy Oct 10 '20 at 09:39
  • where should the output occur according to your plan? – Racooneer Oct 10 '20 at 09:40
  • Here is the log: https://pastebin.com/p44Geiiw – TheDataGuy Oct 10 '20 at 09:42
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/222815/discussion-between-bhuvanesh-and-racooneer). – TheDataGuy Oct 10 '20 at 09:43
  • to be honest, from what I see there, airflow is only logging the output of directly called operators and does not thunnel the indirectly called output through. And this goes beyond my understanding. I'd advise a different test, where a test line of output with the current timestamp is written to a file or something like this to see if it works. But I think I cannot help you further. Sorry – Racooneer Oct 10 '20 at 09:50
  • I think that task didnt run, because I have a touch command to create a file. After the execution Im not able to see the file. – TheDataGuy Oct 10 '20 at 09:56
  • you should return t1.execute() – Alejandro Kaspar Oct 10 '20 at 14:49
  • ERROR - execute() missing 1 required positional argument: 'context', I tried def `postgres_to_gcs(**context):` – TheDataGuy Oct 10 '20 at 14:52

1 Answers1

17

I'm not exactly sure what you are trying to do but the code you posted in the python function doesn't really execute the operator.

This should work just fine:

def postgres_to_gcs():
    t1 = BashOperator(
        task_id='count_lines',
        bash_command='echo task1', 
        xcom_push=True            #Note: there is no dag=dag here!
    )
    t1.execute(dict())

with DAG(
        'python_dag',
        description='Python DAG',
        schedule_interval='*/15 * * * *',
        start_date=datetime(2018, 1, 1),
        catchup=False
) as dag:
    python_task = PythonOperator(
        task_id='python_task',
        python_callable=postgres_to_gcs
    )

Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute method.

Note: Using operator inside operator is not a good practice. You should use hooks or create custom operators. You can read more about why in the following answer.

Elad Kalif
  • 14,110
  • 2
  • 17
  • 49
  • 2
    Seeing an operator used this way inside a Python callable kind of blew my mind. I don't think I've seen this example shown anywhere else. This helped me with a thorny issue - thanks so much for this answer! I wish I could upvote multiple times. – gregoltsov Dec 18 '20 at 22:03
  • I tried above code piece but i am getting below error. ```python t1.execute() TypeError: execute() missing 1 required positional argument: 'context' ``` – Raj Jan 29 '21 at 12:23
  • @Raj probably your operator has provide_conetext=True ? – Elad Kalif Jan 29 '21 at 12:27
  • I copied the code in above example and pasted as is with these imports ```python from airflow import DAG from datetime import datetime from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator ``` Airflow version 1.10.12 – Raj Jan 29 '21 at 12:36
  • 1
    @Raj check if `t1.execute(dict())` solves your issue – Elad Kalif Jan 29 '21 at 13:22
  • 1
    @Raj Usually python callables have context so we can pass this to the execute but since it wasn't in the function provided in the question I didn't add it. – Elad Kalif Feb 01 '21 at 10:13
  • @Elad thanks a lot bro. I'm completely new to Airflow and python as well so I was looking for executing an operator from within a for-loop inside a function and I kinda not left with any option but then your solution did work for my case. Thanks again! – Akhil Oct 30 '21 at 20:51