I have the following airflow code which basically reads the tables from MySql DB into Google Cloud Storage and then to Google Big Query.
I have included the connection details along with the service accounts for both MySQL and GCP in the admin tab of the airflow.
extract = MySqlToGoogleCloudStorageOperator(
task_id='extract_actors',
mysql_conn_id='factset_test',
google_cloud_storage_conn_id='gcp_test',
sql='SELECT * FROM mysql.time_zone',
bucket='airflow_1',
filename='actors/actors{}.json',
schema_filename='schemas/actors.json',
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bq_time_zone",
bigquery_conn_id='gcp_test',
google_cloud_storage_conn_id='gcp_test',
bucket='airflow_1',
destination_project_dataset_table="airflow.mysql_time_zone",
source_objects='actors/actors0.json',
schema_object='schemas/actors.json',
source_format='NEWLINE_DELIMITED_JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
dag=dag)
I see that table time_zone from MySql db is getting copied to the cloud storage bucket airflow_1. But when the airflow is trying to copy the data from cloud storage to BigQuery it is complaining that it is not able to find the cloud storage bucket. Following are the log details:
[2018-03-12 02:16:59,031] {models.py:167} INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:16:59,601] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run mysql_to_gcs load_bq_time_zone 2018-03-12T02:16:48.974591 --job_id 465 --raw -sd DAGS_FOLDER/mysql_to_gcs.py']
[2018-03-12 02:16:59,822] {base_task_runner.py:95} INFO - Subtask: /usr/local/lib/python2.7/dist-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-03-12 02:16:59,822] {base_task_runner.py:95} INFO - Subtask: """)
[2018-03-12 02:16:59,858] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,857] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-03-12 02:16:59,949] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,949] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-03-12 02:16:59,973] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,973] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-03-12 02:17:00,157] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,157] {models.py:167} INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:17:00,712] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,712] {models.py:1126} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,720] {models.py:1126} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,721] {models.py:1318} INFO -
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 2
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask:
[2018-03-12 02:17:00,738] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,737] {models.py:1342} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): load_bq_time_zone> on 2018-03-12 02:16:48.974591
[2018-03-12 02:17:00,792] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,792] {gcp_api_base_hook.py:81} INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {__init__.py:44} WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: from . import file_cache
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: 'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {discovery.py:274} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {transport.py:157} INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:00,838] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,838] {client.py:777} INFO - Refreshing access_token
[2018-03-12 02:17:00,910] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,909] {discovery.py:868} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/airflow_1/o/%2Fschemas%2Factors.json?alt=media
[2018-03-12 02:17:01,209] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,208] {gcp_api_base_hook.py:81} INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:01,210] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {__init__.py:44} WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,210] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: from . import file_cache
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: 'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {discovery.py:274} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2018-03-12 02:17:01,212] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {transport.py:157} INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:01,248] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,248] {client.py:777} INFO - Refreshing access_token
[2018-03-12 02:17:01,325] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,325] {bigquery_hook.py:961} INFO - project not included in destination_project_dataset_table: airflow.mysql_time_zone; using project "bigquery-1210"
[2018-03-12 02:17:01,339] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,339] {discovery.py:868} INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs?alt=json
[2018-03-12 02:17:01,888] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,887] {discovery.py:868} INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L?alt=json
[2018-03-12 02:17:02,064] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,062] {models.py:1417} ERROR - BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask: result = task_copy.execute(context=context)
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/gcs_to_bq.py", line 153, in execute
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask: schema_update_options=self.schema_update_options)
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 476, in run_load
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask: return self.run_with_configuration(configuration)
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 513, in run_with_configuration
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask: job['status']['errorResult'], job
[2018-03-12 02:17:02,069] {base_task_runner.py:95} INFO - Subtask: Exception: BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}
[2018-03-12 02:17:02,069] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,068] {models.py:1433} INFO - Marking task as UP_FOR_RETRY
[2018-03-12 02:17:02,087] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,085] {models.py:1462} ERROR - BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}