I upload 10 files every day at 11 p.m with a Cron Job to a bucket on GCS. Each file is a .csv
with a size from 2
to 30 KB
. The file name is always YYYY-MM-DD-ID.csv
A Cloud Function is called everytime I am uploading a file into that bucket to send those .csv
files to BigQuery. The trigger type is Cloud Storage
on finalise/create
events.
My issue is the following:
On BigQuery, each value for each row/columns is multiplied by a multiple. Sometimes it's 1 (so the value is the same), often 2 and sometimes 3. I attached one example bellow with the difference between BigQuery (BQ) and Google Cloud Storage (GCS).
It seems that the cloud function is called multiple times. It's not on the code but rather some duplicate message deliveries from the Cloud Function during the trigger. When I am going o the logs tab for today, I can see the Cloud Function upload_to_bigquery
has been called multiple times.
I have tried to fix it but I made a mistake. I thought we could write temporary files to Cloud Functions but we can not. My solution was to write the filename I am uploading to BigQuery on a .txt file. And before to upload a new file on BigQuery, read that .txt file and check if the current file exist on that list. If the filename is already present, skip. Else, write the .txt filename to the list and do my stuff.
if file_to_upload not in text:
text.append(file_to_upload)
with open("all_uploaded_files.txt", "w") as text_file:
for item in text:
text_file.write(item + "\n")
bucket = storage_client.bucket('sfr-test-data')
blob = bucket.blob("all_uploaded_files.txt")
blob.upload_from_filename("all_uploaded_files.txt")
## do my things here
else:
print("file already uploaded")
# skip to new file to upload
But even if I could do that, this solution is not viable. The temporary file will become so large after months of years that it would be a mess. Do you know whats the easiest way to fix this issue?
Cloud Function: upload_to_big_query - main.py
BUCKET = "xxx"
GOOGLE_PROJECT = "xxx"
HEADER_MAPPING = {
"Source/Medium": "source_medium",
"Campaign": "campaign",
"Last Non-Direct Click Conversions": "last_non_direct_click_conversions",
"Last Non-Direct Click Conversion Value": "last_non_direct_click_conversion_value",
"Last Click Prio Conversions": "last_click_prio_conversions",
"Last Click Prio Conversion Value": "last_click_prio_conversion_value",
"Data-Driven Conversions": "dda_conversions",
"Data-Driven Conversion Value": "dda_conversion_value",
"% Change in Conversions from Last Non-Direct Click to Last Click Prio": "last_click_prio_vs_last_click",
"% Change in Conversions from Last Non-Direct Click to Data-Driven": "dda_vs_last_click"
}
SPEND_HEADER_MAPPING = {
"Source/Medium": "source_medium",
"Campaign": "campaign",
"Spend": "spend"
}
tables_schema = {
"google-analytics": [
bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
bigquery.SchemaField("goal", bigquery.enums.SqlTypeNames.STRING, mode='REQUIRED'),
bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
bigquery.SchemaField("last_non_direct_click_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
bigquery.SchemaField("last_non_direct_click_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
bigquery.SchemaField("last_click_prio_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
bigquery.SchemaField("last_click_prio_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
bigquery.SchemaField("dda_conversions", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
bigquery.SchemaField("dda_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
bigquery.SchemaField("last_click_prio_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
bigquery.SchemaField("dda_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE')
],
"google-analytics-spend": [
bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
bigquery.SchemaField("spend", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
]
}
def download_from_gcs(file):
client = storage.Client()
bucket = client.get_bucket(BUCKET)
blob = bucket.get_blob(file['name'])
file_name = os.path.basename(os.path.normpath(file['name']))
blob.download_to_filename(f"/tmp/{file_name}")
return file_name
def load_in_bigquery(file_object, dataset: str, table: str):
client = bigquery.Client()
table_id = f"{GOOGLE_PROJECT}.{dataset}.{table}"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
schema=tables_schema[table]
)
job = client.load_table_from_file(file_object, table_id, job_config=job_config)
job.result() # Wait for the job to complete.
def __order_columns(df: pd.DataFrame, spend=False) ->pd.DataFrame:
# We want to have source and medium columns at the third position
# for a spend data frame and at the fourth postion for others df
# because spend data frame don't have goal column.
pos = 2 if spend else 3
cols = df.columns.tolist()
cols[pos:2] = cols[-2:]
cols = cols[:-2]
return df[cols]
def __common_transformation(df: pd.DataFrame, date: str, goal: str) -> pd.DataFrame:
# for any kind of dataframe, we add date and week columns
# based on the file name and we split Source/Medium from the csv
# into two different columns
week_of_the_year = datetime.strptime(date, '%Y-%m-%d').isocalendar()[1]
df.insert(0, 'date', date)
df.insert(1, 'week', week_of_the_year)
mapping = SPEND_HEADER_MAPPING if goal == "spend" else HEADER_MAPPING
print(df.columns.tolist())
df = df.rename(columns=mapping)
print(df.columns.tolist())
print(df)
df["source_medium"] = df["source_medium"].str.replace(' ', '')
df[["source", "medium"]] = df["source_medium"].str.split('/', expand=True)
df = df.drop(["source_medium"], axis=1)
df["week"] = df["week"].astype(int, copy=False)
return df
def __transform_spend(df: pd.DataFrame) -> pd.DataFrame:
df["spend"] = df["spend"].astype(float, copy=False)
df = __order_columns(df, spend=True)
return df[df.columns[:6]]
def __transform_attribution(df: pd.DataFrame, goal: str) -> pd.DataFrame:
df.insert(2, 'goal', goal)
df["last_non_direct_click_conversions"] = df["last_non_direct_click_conversions"].astype(int, copy=False)
df["last_click_prio_conversions"] = df["last_click_prio_conversions"].astype(int, copy=False)
df["dda_conversions"] = df["dda_conversions"].astype(float, copy=False)
return __order_columns(df)
def transform(df: pd.DataFrame, file_name) -> pd.DataFrame:
goal, date, *_ = file_name.split('_')
df = __common_transformation(df, date, goal)
# we only add goal in attribution df (google-analytics table).
return __transform_spend(df) if "spend" in file_name else __transform_attribution(df, goal)
def main(event, context):
"""Triggered by a change to a Cloud Storage bucket.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
file = event
file_name = download_from_gcs(file)
df = pd.read_csv(f"/tmp/{file_name}")
transformed_df = transform(df, file_name)
with open(f"/tmp/bq_{file_name}", "w") as file_object:
file_object.write(transformed_df.to_csv(index=False))
with open(f"/tmp/bq_{file_name}", "rb") as file_object:
table = "google-analytics-spend" if "spend" in file_name else "google-analytics"
load_in_bigquery(file_object, dataset='attribution', table=table)