I receive daily CSV files from Google Compute Engine into my Storage bucket and I wrote a Cloud Function that loads these CSV's data into a BigQuery table and it goes well. However, I need to include file creation time and file update time from the CSV file metadata as columns before sending the data to BigQuery table or as it is being sent.
Is this possible in Cloud Function and how can I do that? I would appreciate it if there is some kind of example that you can guide me to.
# my code in cloud functions
import os
from google.cloud import bigquery
GCP_PROJECT = os.environ.get('GCP_PROJECT')
def FlexToBigQuery(data, context):
bucketname = data['bucket']
filename = data['name']
timeCreated = data['timeCreated']
client = bigquery.Client()
dataset_id = 'My-dataset'
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = 1
job_config.field_delimiter = ';',
job_config.allow_jagged_rows = True
job_config.allow_quoted_newlines = True
job_config.write_disposition = 'WRITE_TRUNCATE',
job_config.source_format = bigquery.SourceFormat.CSV
job_config.schema = [
bigquery.SchemaField('Anstallningsnummer', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Kod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Kostnadsstalle', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Tidkod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('OB_tidkod', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Dagsschema', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Schemalagd_arbetstid', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Summa_narvaro', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Summa_franvaro', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum_for_klarmarkering', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Datum_for_attestering', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Frislappsdatum', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Export_klockslag', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('Vecka', 'STRING', mode='NULLABLE')
]
uri = 'gs://%s/%s' % (bucketname, filename)
print('Received file "%s" at %s.' % (
uri,
timeCreated
))
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + data['bucket'] + '/' + data['name']
# lets do this
load_job = client.load_table_from_uri(
uri,
dataset_ref.table('employee_time'),
job_config=job_config)
print('Starting job with ID {}'.format(load_job.job_id))
print('File: {}'.format(data['name']))
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table('employee_time'))
print('Loaded {} rows.'.format(destination_table.num_rows))