I am trying to load a CSV into BQ using a custom operator in Airflow.
My custom operator is using
load_job_config = bigquery.LoadJobConfig(
schema=self.schema_fields,
skip_leading_rows=self.skip_leading_rows,
source_format=bigquery.SourceFormat.CSV
)
load_job = client.load_table_from_uri(
'gs://' + self.source_bucket + "/" + self.source_object, self.dsp_tmp_dataset_table,
job_config=load_job_config
) #
The issue I am facing is that I always get errors
google.api_core.exceptions.BadRequest: 400 Provided Schema does not match Table nonprod-cloud-composer:dsp_data_transformation.tremorvideo_daily_datafeed. Field Date has changed type from TIMESTAMP to DATE
The exact same code when run outside of Airflow as a stand alone python works fine. I am using the exactly same schema object , same source CSV file just that the environment is different.
Below is the high level steps followed
Created table in BQ
Using the
LOAD DATA OVERWRITE XXXX FROM FILES ( format = 'CSV', uris = ['gs://xxx.csv']);
This worked fine and the data was loaded into the table. 3. Truncated the table and tried to run the custom operator what has the code above listed. Then faced errors. 4. Created a simple python program with to test the bq load job and that works fine too.
Its just that when ever the same load job is triggered using Airflow the schema detection fails and leads to all sorts of errors.