7

I am facing issue in running airflow DAGs. Sometimes I get below messages in logs and DAG stops execution but sometimes it works fine. I tried to google the error but did not find anything. below are my logs.

[2020-06-17 14:50:06,983] {__init__.py:1133} INFO - Dependencies not met for <TaskInstance: feeds__my_feed.my_feed_feed_processor 2020-06-16T12:13:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
[2020-06-17 14:50:06,984] {__init__.py:1133} INFO - Dependencies not met for <TaskInstance: feeds__my_feed.my_feed_feed_processor 2020-06-16T12:13:00+00:00 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2020-06-17 12:13:32.150826+00:00.
[2020-06-17 14:50:06,987] {logging_mixin.py:95} INFO - [2020-06-17 14:50:06,987] {jobs.py:2549} INFO - Task is not able to be run

For more details, I checked the {jobs.py:2549} IN AIRFLOW SOURCE CODE and found the below thing. I am not sure why it stops execution in between. Please help me.

def _execute(self):
    self.task_runner = get_task_runner(self)

    def signal_handler(signum, frame):
        """Setting kill signal handler"""
        self.log.error("Received SIGTERM. Terminating subprocesses")
        self.on_kill()
        raise AirflowException("LocalTaskJob received SIGTERM signal")
    signal.signal(signal.SIGTERM, signal_handler)

    if not self.task_instance._check_and_change_state_before_execution(
            mark_success=self.mark_success,
            ignore_all_deps=self.ignore_all_deps,
            ignore_depends_on_past=self.ignore_depends_on_past,
            ignore_task_deps=self.ignore_task_deps,
            ignore_ti_state=self.ignore_ti_state,
            job_id=self.id,
            pool=self.pool):
        self.log.info("Task is not able to be run")
        return

below is DAG details

{'dag': <DAG: feeds__my_feed>, 'ds': '2020-06-16', 'next_ds': '2020-06-17', 'next_ds_nodash': '20200617', 'prev_ds': '2020-06-15', 'prev_ds_nodash': '20200615', 'ds_nodash': '20200616', 'ts': '2020-06-16T12:13:00+00:00', 'ts_nodash': '20200616T121300', 'ts_nodash_with_tz': '20200616T121300+0000', 'yesterday_ds': '2020-06-15', 'yesterday_ds_nodash': '20200615', 'tomorrow_ds': '2020-06-17', 'tomorrow_ds_nodash': '20200617', 'END_DATE': '2020-06-16', 'end_date': '2020-06-16', 'dag_run': <DagRun feeds__my_feed @ 2020-06-16 12:13:00+00:00: scheduled__2020-06-16T12:13:00+00:00, externally triggered: False>, 'run_id': 'scheduled__2020-06-16T12:13:00+00:00', 'execution_date': <Pendulum [2020-06-16T12:13:00+00:00]>, 'prev_execution_date': datetime.datetime(2020, 6, 15, 12, 13, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'next_execution_date': datetime.datetime(2020, 6, 17, 12, 13, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'latest_date': '2020-06-16', 'macros': <module 'airflow.macros' from '/root/.local/share/virtualenvs/app-4PlAip0Q/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'params': {}, 'tables': None, 'task': <Task(PythonOperator): my_feed_feed_processor>, 'task_instance': <TaskInstance: feeds__my_feed.my_feed_processor 2020-06-16T12:13:00+00:00 [running]>, 'ti': <TaskInstance: feeds__my_feed.my_feed_processor 2020-06-16T12:13:00+00:00 [running]>, 'task_instance_key_str': 'feeds__my_feed_feed_processor__20200616', 'conf': <module 'airflow.configuration' from '/root/.local/share/virtualenvs/app-4PlAip0Q/lib/python3.6/site-packages/airflow/configuration.py'>, 'test_mode': False, 'var': {'value': None, 'json': None}, 'inlets': [], 'outlets': [],'task_id': 'my_feed_processor', 'templates_dict': None}
Subham
  • 671
  • 9
  • 23
  • Can you show me your DAG code. Also go through your code and add exception handling in case you are having any exception which is being not caught. Also is this for a particular dags or for all dags ? I am just doing a guess work here . Hope you understand. – hopeIsTheonlyWeapon Aug 10 '20 at 14:54
  • It happens for only a particular DAG. – Subham Aug 10 '20 at 14:59
  • Yes, I have done `try: except` but since it logs as INFO so it does not seems like the exception. – Subham Aug 10 '20 at 15:01
  • Ok checking the code do you find out anything obvious which can cause this. Also what are using for your persistent volumer(for your logs and dags ?) Also check if this SO link helps? https://stackoverflow.com/questions/56878366/airflow-worker-stuck-task-is-in-the-running-state-which-is-not-a-valid-state – hopeIsTheonlyWeapon Aug 10 '20 at 15:02
  • You can change your log level through airflow.cfg – hopeIsTheonlyWeapon Aug 10 '20 at 15:02

0 Answers0