13

I am running Airflowv1.10.15 on Cloud Composer v1.16.16.

My DAG looks like this :

from datetime import datetime, timedelta

# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large

default_args = {
    'owner': 'xxxx',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 14),
    'email_on_failure': True,
    'email': ['xxxx'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'catchup': False
}


# Define the DAG with parameters
dag = DAG(
    dag_id='xxxx_v1',
    schedule_interval='0 20 * * *',
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    concurrency=1
)

def wd_to_bq(key, val, **kwargs):
    logger.info("workday to BQ ingestion")
    workday_extract.fetch_wd_load_bq(key, val)


start_load = DummyOperator(task_id='start', dag=dag)

end_load = DummyOperator(task_id='end', dag=dag)

for key, val in workday_config_large.endpoint_tbl_mapping.items():
    # Task 1: Process the unmatched records from the view
    workday_to_bq = PythonOperator(
        dag=dag,
        task_id=f'{key}',
        execution_timeout=timedelta(minutes=60),
        provide_context=True,
        python_callable=wd_to_bq,
        op_kwargs={'key': key, 'val': val}
    )
    start_load >> workday_to_bq >> end_load

The task fails with error - Task exited with return code Negsignal.SIGKILL . The python script runs fine on my local machine and completes in 15 minutes. There are multiple endpoints from which the reports are extracted. However, the one that takes longest ( ~15 minutes) fails with this error and others succeed.

I have tried a lot of options but none seem to work. Can someone help on this ?

saurabh saraff
  • 145
  • 1
  • 1
  • 6
  • Cloud Composer gives you a monitoring dashboard. I'd suggest running only the task that fails and checking the memory and CPU pressure on the Airflow worker during that time. That'll tell you which resources you need to increase. – GregK Sep 29 '21 at 21:30
  • If my answer addressed your question, please consider accepting and upvoting it. If not, let me know so that I can improve my answer. – Shipra Sarkar Feb 02 '22 at 14:23

5 Answers5

10

I resolved the issue by increasing memory size

https://github.com/apache/airflow/issues/10435

Should check the memory size of the pod that roles as worker while running the task

user3705451
  • 101
  • 1
  • 2
  • Link only answer is useless. Can you elaborate on this a little more? – Toto Sep 18 '21 at 10:10
  • The data file to be downloaded is around 100 MB. I am using a cluster of 3 nodes each with n1-standard-1, i.e. 3.75 G RAM . Is is not enough ? Should i increase it to 7.5 G ? – saurabh saraff Sep 18 '21 at 12:22
  • Did you do what Greg@ suggested? This error is related to resources, so you just need to increase the resources. However, this is a know issue in Airflow and they will post an official solution when they have one. – Jaime Lopez Oct 07 '21 at 08:29
2

This error occurs when the allocated resources are less than what is required. DAG execution is RAM limited. More memory can be consumed depending on the DAG’s nature. So it is always preferable to use machine types with higher memory. Since you are using Cloud Composer 1, autoscaling of the resources is not possible. It would be preferable to increase your resources.

Shipra Sarkar
  • 1,385
  • 3
  • 10
2

A message like below in your airflow task logs suggests that the kernel/OS killed your process. SIGKILL(signal 9) is a directive to kill the process immediately.

{{local_task_job.py:102}} INFO - Task exited with return code Negsignal.SIGKILL

It is very likely that the task you are performing(in this case the function - workday_to_bq) was utilizing a lot of resources on the worker container. I'm assuming that you are ingesting and processing some data which can be memory intensive.

You've mentioned that its working locally but failing in airflow cloud. This could be because either you have a lot of RAM on local system OR your cloud composer airflow workers are processing other DAG's that are hogging the worker memory. To confirm that this is a memory issue you can check the dashboard provided by the cloud service.

Airflow runs its tasks on workers, hence you will have to upgrade the workers with better hardware. Try increasing the RAM.

  • In case of fully managed services like cloud composer, MWAA the cloud provider should allow you to increase the underlying hardware.
  • If you are using docker, docker-desktop(link to increase overall memory of docker desktop), swarm or kubernetes then check what is the container/pod memory limit for worker is set to. This can be then increased accordingly in the manifest files.

Note that the purpose of airflow is to schedule ETL tasks and orchestrate the pipeline. You shouldn't be loading high volumes of data into the airflow workers and utilize all of its cpu/memory. This will slow down your entire airflow environment or SIGKILL your DAGS randomly. In most cases only the DAG/process that is using too much memory will be killed by the OOM killer, but sometimes it can kill other DAGS/process's on the same worker simultaneously.

For loading/processing/writing large amounts of data use ETL tools like fivetran, airbyte, databricks, nifi, azure data factory etc.. and use airflow for scheduling and orchestration.

ns15
  • 5,604
  • 47
  • 51
1

I had this issue too, but took a different approach.

Have you considered how your script may use less memory / use memory better, instead of simply increasing the available memory ?

    with db_connector_warehouse.create_session() as session:
        query = session.query(offers_table)\
            .yield_per(chunk_size).enable_eagerloads(False)
        
        for df in pd.read_sql(query.statement, session.bind, chunksize=chunk_size):
            yield df

in the above example - bottom part passing chunksize to pandas will have it pull the dataframe in smaller chunks, however pandas still loads everything into memory first, and then gives you the part you requested (for read_sql, and likely other loading functions such as csv / xlsx but haven't looked into this).

So you must ensure that you don't load the entire dataset - if using SQL Alchemy's ORM you need to use the yield_per param. For normal connections, you can set the connection to stream the results

A couple useful resources if you'd rather go down the route of using less memory:

How to create a large pandas dataframe from an sql query without running out of memory?

https://pythonspeed.com/articles/pandas-sql-chunking/

and if you're not familiar with the yield flow control What does the "yield" keyword do?

David Mendes
  • 197
  • 2
  • 4
0

I had this happen when I was using a ThreadPoolExecutor, which doesn't release any resources until all the futures are done. To prevent the errors, I switched to processing four elements at a time:

while True:
    chunk = itertools.islice(documents, 0, 4)
    if not chunk:
        break
    with ThreadPoolExecutor(max_workers=4) as executor:
        for each in executor.map(TextScraper(), chunk):
            pass
Noumenon
  • 5,099
  • 4
  • 53
  • 73