Description
- How to run multiple ExternalPythonOperator (I need different packages / versions for different DAG tasks) after each other in serial without being dependent on the previous task's success "upstream_fail".
- So it should just execute task after each other without caring about if any of them fails or succeeds.
- You might ask than why not just create separate DAG files. The point of this is that I want to run a couple of extremely resource intense task after each other in a very much separate time period than any other tasks to make sure that they don't cause any disruption. They also have to be separated from each other because each one could disrupts each other just based on resource constrains both on the server and for other external reasons as well.
My Code
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
my_default_args = {
'owner': 'me',
#'email': ['myemail@myemail.com'],
'email_on_failure': True,
#'email_on_retry': True,
#'retries': 1,
# 'retry_delay': timedelta(minutes=1)
}
with DAG(
dag_id='some_dag_id_comes_here',
schedule='1 * * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), # this is from whre it starts counting time to run taks, NOT like cron
catchup=False,
default_args=my_default_args,
tags=['xyz1'],
) as dag:
@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3')
def func1():
print('elements of task 1')
time.sleep(10)
@task.external_python(task_id="task2", python='/opt/airflow/my_env/bin/python3')
def func2():
print('elements of task 2')
time.sleep(10)
task1 >> task2
Things That I have tried
How to Trigger a DAG on the success of a another DAG in Airflow using Python? - My question is the polar oposit
Triggering the external dag using another dag in Airflow - ???
How to schedule the airflow DAG to run just after the end of the previous running DAG?
I have tried the 2nd - https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#trigger-rules
CODE 1
@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.ALL_DONE)
ERROR 1
Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/name.py", line 224, in <module>
task1 >> task2 >> task3 >> task4 >> task5 TypeError: unsupported operand type(s) for >>: '_TaskDecorator' and '_TaskDecorator'
CODE 2
@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.none_skipped)
ERROR 2
Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
File "/opt/airflow/dags/test_file.py", line 51, in <module>
,trigger_rule=TriggerRule.none_skipped
File "/usr/local/lib/python3.8/enum.py", line 384, in __getattr__
raise AttributeError(name) from None
AttributeError: none_skipped