I am having a for loop in my DAG which creates some tasks using kubernetes pod operator. These tasks run in parallel and there are enough resources available on the k8s nodes to let them run in parallel. These containers share an EFS volume as a PVC and are performing read/write operations on this volume.
The issue is that these tasks start as all green in the airflow UI for some time (~2hrs). Then some of them are marked as failed on the UI (red) while the corresponding containers are still in running state on k8s cluster (kubectl get pods command)
.
Then surprisingly, the failed tasks on the UI turn back green again (running status) after some time and finish successfully. The problem here is that due to some tasks being marked as failed momentarily, the downstream tasks are marked skipped
(trigger_rule = all_success)
, even though the upstream tasks somehow finished just fine later on.
Logs report no issues with the code.
Why is this happening and is there a way I can make the downstream task run again (from the DAG itself and not any external cmd like cli) even though it's skipped once the upstream tasks finish as normal ? Can this be EFS problem somehow ?
Where should I be looking to find why tasks are momentarily marked as failed and then pass just fine after a while.
Below is my DAG.
my_task_spec = k8s.V1Pod(
metadata="my_task",
spec=k8s.V1PodSpec(containers=[
k8s.V1Container(
name="base",
image="#{image}#",
volume_mounts=[volume_mount],
resources=k8s.V1ResourceRequirements(
requests={
"cpu":"1000m",
"memory":"2Gi",
},
limits={
"cpu":"2000m",
"memory":"4Gi",
}
)
)
], ))
for i in range(num_pods):
#skipping details
KubernetesPodOperator(
task_id=task_id,
full_pod_spec=my_task_spec,
volumes=[volume],
name=task_id,
get_logs=True,
is_delete_operator_pod=True,
in_cluster=False,
config_file=kube_config_path,
startup_timeout_seconds=1200,
trigger_rule='all_success',
dag=dag)
for task in dag.tasks:
if task.task_id.startswith('my_task'):
task.set_downstream(some_other_task) # this is being marked as skipped once any task above fails