1

I have the following airflow code which basically reads the tables from MySql DB into Google Cloud Storage and then to Google Big Query.

I have included the connection details along with the service accounts for both MySQL and GCP in the admin tab of the airflow.

extract = MySqlToGoogleCloudStorageOperator(
    task_id='extract_actors',
    mysql_conn_id='factset_test',
    google_cloud_storage_conn_id='gcp_test',
    sql='SELECT * FROM mysql.time_zone',
    bucket='airflow_1',
    filename='actors/actors{}.json',
    schema_filename='schemas/actors.json',
    dag=dag)

load = GoogleCloudStorageToBigQueryOperator(
            task_id="load_bq_time_zone",
            bigquery_conn_id='gcp_test',
            google_cloud_storage_conn_id='gcp_test',
            bucket='airflow_1',
            destination_project_dataset_table="airflow.mysql_time_zone",
            source_objects='actors/actors0.json',
            schema_object='schemas/actors.json',
            source_format='NEWLINE_DELIMITED_JSON',
            create_disposition='CREATE_IF_NEEDED',
            write_disposition='WRITE_TRUNCATE',
            dag=dag)

I see that table time_zone from MySql db is getting copied to the cloud storage bucket airflow_1. But when the airflow is trying to copy the data from cloud storage to BigQuery it is complaining that it is not able to find the cloud storage bucket. Following are the log details:

[2018-03-12 02:16:59,031] {models.py:167} INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:16:59,601] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run mysql_to_gcs load_bq_time_zone 2018-03-12T02:16:48.974591 --job_id 465 --raw -sd DAGS_FOLDER/mysql_to_gcs.py']
[2018-03-12 02:16:59,822] {base_task_runner.py:95} INFO - Subtask: /usr/local/lib/python2.7/dist-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-03-12 02:16:59,822] {base_task_runner.py:95} INFO - Subtask:   """)
[2018-03-12 02:16:59,858] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,857] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-03-12 02:16:59,949] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,949] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-03-12 02:16:59,973] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,973] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-03-12 02:17:00,157] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,157] {models.py:167} INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:17:00,712] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,712] {models.py:1126} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,720] {models.py:1126} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,721] {models.py:1318} INFO - 
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 2
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: 
[2018-03-12 02:17:00,738] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,737] {models.py:1342} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): load_bq_time_zone> on 2018-03-12 02:16:48.974591
[2018-03-12 02:17:00,792] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,792] {gcp_api_base_hook.py:81} INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {__init__.py:44} WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask:     from . import file_cache
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask:     'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {discovery.py:274} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {transport.py:157} INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:00,838] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,838] {client.py:777} INFO - Refreshing access_token
[2018-03-12 02:17:00,910] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,909] {discovery.py:868} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/airflow_1/o/%2Fschemas%2Factors.json?alt=media
[2018-03-12 02:17:01,209] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,208] {gcp_api_base_hook.py:81} INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:01,210] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {__init__.py:44} WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,210] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:     from . import file_cache
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:     'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {discovery.py:274} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2018-03-12 02:17:01,212] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {transport.py:157} INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:01,248] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,248] {client.py:777} INFO - Refreshing access_token
[2018-03-12 02:17:01,325] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,325] {bigquery_hook.py:961} INFO - project not included in destination_project_dataset_table: airflow.mysql_time_zone; using project "bigquery-1210"
[2018-03-12 02:17:01,339] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,339] {discovery.py:868} INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs?alt=json
[2018-03-12 02:17:01,888] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,887] {discovery.py:868} INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L?alt=json
[2018-03-12 02:17:02,064] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,062] {models.py:1417} ERROR - BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/gcs_to_bq.py", line 153, in execute
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:     schema_update_options=self.schema_update_options)
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 476, in run_load
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:     return self.run_with_configuration(configuration)
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 513, in run_with_configuration
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:     job['status']['errorResult'], job
[2018-03-12 02:17:02,069] {base_task_runner.py:95} INFO - Subtask: Exception: BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}
[2018-03-12 02:17:02,069] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,068] {models.py:1433} INFO - Marking task as UP_FOR_RETRY
[2018-03-12 02:17:02,087] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,085] {models.py:1462} ERROR - BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}
Cloud
  • 119
  • 2
  • 13
  • I am unable to see a problem finding the bucket in the log. What I can see, though, is some kind of auth error, more specific here: https://stackoverflow.com/questions/40154672/importerror-file-cache-is-unavailable-when-using-python-client-for-google-ser/44518587 – tobi6 Mar 12 '18 at 09:00
  • If you see the last line of the log. ERROR - BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. – Cloud Mar 12 '18 at 11:36
  • Also when Airflow is not complaining about auth error when it writes the MySql table to cloud storage, then i am not sure why it is complaining when it reading the data from cloud storage to Bigquery. – Cloud Mar 12 '18 at 11:59

3 Answers3

2

The problem is source_objects='actors/actors0.json'

Needs to be source_objects=['actors/actors0.json']

From the documentation:

:param source_objects: List of Google cloud storage URIs to load from. (templated)
    If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
:type object: list

And here's the code:

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
                   for source_object in self.source_objects]

So it iterates through it.

Andrei
  • 89
  • 2
  • 11
  • true, source_objects is type of list, this has solved my issue, I was stuck for hours in this . source_objects=['my_file.csv'] – user3065757 Feb 12 '21 at 19:38
1

The source_objects expects a list to be supplied i.e. source_objects=['actors/actors0.json'] as it is possible to load more than one file at a time.

Always best to check the operators code for field definition like https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/contrib/operators/gcs_to_bq.py#L65

Blakey
  • 803
  • 7
  • 15
0

Looking at the error message, the problem is that the bucket you specified: airflow_1 does not exist. In fact, I tried to create a bucket with this name in my project and the name is available (a bucket name must be globally unique across Cloud Storage).

Also, be aware that BigQuery does not support source URIs that include multiple consecutive slashes after the initial double slash.

MonicaPC
  • 379
  • 4
  • 15