0

Description

  • How to run multiple ExternalPythonOperator (I need different packages / versions for different DAG tasks) after each other in serial without being dependent on the previous task's success "upstream_fail".
  • So it should just execute task after each other without caring about if any of them fails or succeeds.
  • You might ask than why not just create separate DAG files. The point of this is that I want to run a couple of extremely resource intense task after each other in a very much separate time period than any other tasks to make sure that they don't cause any disruption. They also have to be separated from each other because each one could disrupts each other just based on resource constrains both on the server and for other external reasons as well.

My Code

import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()


my_default_args = {
    'owner': 'me',
    #'email': ['myemail@myemail.com'],
    'email_on_failure': True,
    #'email_on_retry': True,
    #'retries': 1,
#     'retry_delay': timedelta(minutes=1)
}


with DAG(
    dag_id='some_dag_id_comes_here',
    schedule='1 * * * *', 
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), # this is from whre it starts counting time to run taks, NOT like cron
    catchup=False,
    default_args=my_default_args,
    tags=['xyz1'],
    ) as dag:
    @task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3')
    def func1(): 
        print('elements of task 1')
        time.sleep(10)

    @task.external_python(task_id="task2", python='/opt/airflow/my_env/bin/python3')
    def func2(): 
        print('elements of task 2')
        time.sleep(10)


    task1 >> task2

Things That I have tried

CODE 1

  @task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.ALL_DONE)

ERROR 1

Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/name.py", line 224, in <module>
task1 >> task2 >> task3 >> task4 >> task5 TypeError: unsupported operand type(s) for >>: '_TaskDecorator' and '_TaskDecorator'

CODE 2

  @task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.none_skipped)

ERROR 2

Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
  File "/opt/airflow/dags/test_file.py", line 51, in <module>
    ,trigger_rule=TriggerRule.none_skipped
  File "/usr/local/lib/python3.8/enum.py", line 384, in __getattr__
    raise AttributeError(name) from None
AttributeError: none_skipped
sogu
  • 2,738
  • 5
  • 31
  • 90
  • I'm a bit confused by your question. You want to run multiple ExternalPythonOperators in sequence regardless if the previous one succeeded or failed? Can you not use a [Trigger Rule](https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#trigger-rules)? That way one single dag can run these heavy tasks. Where do multiple dags come into it? – Daniel T Nov 01 '22 at 01:29
  • @DanielT : ok so No multiple DAG but multiple tasks. This is a typical airflow documentation where they mention something like "all_success and all_failed" and have 0 practical implementation example how to use it. Can you show it how can I add it to my code? – sogu Nov 01 '22 at 09:27

1 Answers1

1

DAG, my solution

import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()


my_default_args = {
    'owner': 'me',
    #'email': ['myemail@myemail.com'],
    'email_on_failure': True,
    #'email_on_retry': True,
    #'retries': 1,
#     'retry_delay': timedelta(minutes=1)
}


with DAG(
    dag_id='some_dag_id_comes_here',
    schedule='1 * * * *', 
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), # this is from whre it starts counting time to run taks, NOT like cron
    catchup=False,
    default_args=my_default_args,
    tags=['xyz1'],
    ) as dag:
    @task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',trigger_rule='all_done')
    def func1(): 
        import time
        print('elements of task 1')
        time.sleep(10)

    @task.external_python(task_id="task2", python='/opt/airflow/my_env/bin/python3',trigger_rule='all_done')
    def func2(): 
        import time
        print('elements of task 2')
        time.sleep(10)


    [func1() >> func2()]

sogu
  • 2,738
  • 5
  • 31
  • 90