0

I use google composer. I have a dag that uses the panda.read_csv() function to read a .csv.gz file. The DAG keeps trying without showing any errors. Here is the airflow log:

 *** Reading remote log from gs://us-central1-data-airflo-dxxxxx-bucket/logs/youtubetv_gcpbucket_to_bq_daily_v2_csv/file_transfer_gcp_to_bq/2018-11-04T20:00:00/1.log.
[2018-11-05 21:03:58,123] {cli.py:374} INFO - Running on host airflow-worker-77846bb966-vgrbz
[2018-11-05 21:03:58,239] {models.py:1196} INFO - Dependencies all met for <TaskInstance: youtubetv_gcpbucket_to_bq_daily_v2_csv.file_transfer_gcp_to_bq 2018-11-04 20:00:00 [queued]>
[2018-11-05 21:03:58,297] {models.py:1196} INFO - Dependencies all met for <TaskInstance: youtubetv_gcpbucket_to_bq_daily_v2_csv.file_transfer_gcp_to_bq 2018-11-04 20:00:00 [queued]>
[2018-11-05 21:03:58,298] {models.py:1406} INFO -
---------------------------------------------------------------------- 
---------
Starting attempt 1 of 
---------------------------------------------------------------------- 
---------

[2018-11-05 21:03:58,337] {models.py:1427} INFO - Executing <Task(BranchPythonOperator): file_transfer_gcp_to_bq> on 2018-11-04 20:00:00
[2018-11-05 21:03:58,338] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run youtubetv_gcpbucket_to_bq_daily_v2_csv file_transfer_gcp_to_bq 2018-11-04T20:00:00 --job_id 15096 --raw -sd DAGS_FOLDER/dags/testdags/youtubetv_gcp_to_bq_v2.py']

python code in DAG:

from datetime import datetime,timedelta
from airflow import DAG
from airflow import models
import os
import io,logging, sys
import pandas as pd
from io import BytesIO, StringIO

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.bash_operator import BashOperator

#GCP
from google.cloud import storage
import google.cloud
from google.cloud import bigquery
from google.oauth2 import service_account

from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow.models import Connection
from airflow.utils.db import provide_session
from airflow.utils.trigger_rule import TriggerRule

def readCSV(checked_date,file_name, **kwargs): 
    subDir=checked_date.replace('-','/')
    fileobj = get_byte_fileobj(BQ_PROJECT_NAME, YOUTUBETV_BUCKET, subDir+"/"+file_name)
    df_chunks = pd.read_csv(fileobj, compression='gzip',memory_map=True, chunksize=1000000) # return TextFileReader
    print ("done reaCSV")
    return df_chunks

DAG:

    file_transfer_gcp_to_bq = BranchPythonOperator(
    task_id='file_transfer_gcp_to_bq',
    provide_context=True,
    python_callable=readCSV,
    op_kwargs={'checked_date': '2018-11-03', 'file_name':'daily_events_xxxxx_partner_report.csv.gz'}
    )

The DAG is successfully run on my local airflow version.

def readCSV(checked_date,file_name, **kwargs): 
   subDir=checked_date.replace('-','/')
   fileobj = get_byte_fileobj(BQ_PROJECT_NAME, YOUTUBETV_BUCKET, subDir+"/"+file_name)
   df = pd.read_csv(fileobj, compression='gzip',memory_map=True)
   return df

tested get_byte_fileobj and it works as a stand alone function.

MT467
  • 668
  • 2
  • 15
  • 31
  • Try adding manual logging lines to the file. Should provide more insight on how far into the execution it gets. – Meghdeep Ray Nov 06 '18 at 07:59
  • Can you add your DAG file and provide more logging info? – kaxil Nov 06 '18 at 12:00
  • @MeghdeepRay not sure how can I do that? so create a sep folder for log on google composer/ – MT467 Nov 06 '18 at 18:48
  • Are you aware of the fact that pandas "does not have native GCS support"(https://stackoverflow.com/questions/48569618/how-do-i-use-pandas-read-csv-on-google-cloud-ml#answer-48570321)? ...Are you trying to read the file like: "gs://..."? Have you tried using the mapped directory (/home/airflow/gcs/data) (https://cloud.google.com/composer/docs/concepts/cloud-storage#folders_in_the_storage_name_bucket)... If this doesn't help edit your post adding all the relevant pieces of codes in your DAG (also modules and so on) – VictorGGl Nov 07 '18 at 10:10
  • @VictorGGl yeah, know that, using blob then download_to_file(). it works for not large csv files let's say less than 1G – MT467 Nov 07 '18 at 16:33
  • Can you add that part of the code, modules you are importing, as well as information about your Composer Environment (composer and airflow version, python version) and the extra packages you have installed (if any) with their corresponding version? – VictorGGl Nov 07 '18 at 16:36
  • @VictorGGl updated, composer-1.3.0-airflow-1.9.0, google-cloud, google-cloud-storage, google-cloud-bigquery google-oauth2 – MT467 Nov 07 '18 at 19:02

2 Answers2

1

Based on this discussion airflow google composer group it is a known issue. One of the reason can be because of overkilling all the composer resources (in my case memory)

MT467
  • 668
  • 2
  • 15
  • 31
0

I have a similar issue recently.

In my case it's beacause the kubernetes worker overload.

You can watch the worker performance on kubernetes dashboard too see whether your case is cluster overloadding issue.

If yes, you can try set the value of an airflow configuration celeryd_concurrency lower to reduce the parallism in a worker and see whether the cluster loads goes down

enter image description here

Bruce Kuo
  • 191
  • 1
  • 11