0

I've been trying to compress my CSV files to .gz before uploading to GCS using Cloud Function-Python 3.7, but what my code does only adds the .gz extension but doesn't really compress the file, so in the end, the file was corrupted. Can you please show me how to fix this? Thanks

here is part of my code

import gzip


def to_gcs(request):    
    job_config = bigquery.QueryJobConfig()
    gcs_filename = 'filename_{}.csv'
    bucket_name = 'bucket_gcs_name'
    subfolder = 'subfolder_name'
    client = bigquery.Client()


    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

    QUERY = "SELECT * FROM `bigquery-public-data.google_analytics_sample.ga_sessions_*` session, UNNEST(hits) AS hits"
    query_job = client.query(
        QUERY,
        location='US',
        job_config=job_config)

    while not query_job.done():
        time.sleep(1)

    rows_df = query_job.result().to_dataframe()
    storage_client = storage.Client()

    storage_client.get_bucket(bucket_name).blob(subfolder+'/'+gcs_filename+'.gz').upload_from_string(rows_df.to_csv(sep='|',index=False,encoding='utf-8',compression='gzip'), content_type='application/octet-stream')

Justine
  • 105
  • 1
  • 17
  • 1
    Does this answer your question? [Write pandas dataframe as compressed CSV directly to Amazon s3 bucket?](https://stackoverflow.com/questions/43729224/write-pandas-dataframe-as-compressed-csv-directly-to-amazon-s3-bucket) – Sam Mason Dec 16 '19 at 13:49
  • you should check the warnings you're getting from Pandas, see https://stackoverflow.com/a/44168817/1358308 and https://github.com/pandas-dev/pandas/issues/22555 – Sam Mason Dec 16 '19 at 13:50
  • The most voted answer from @SamMason 's first comment did work for me. Did that work for you @Justine? – Jose V Dec 17 '19 at 10:18
  • @Jose V, it does! – Justine Dec 18 '19 at 15:33
  • Hi, I posted an answer as community wiki (so I won't get any reputation) with the info from @Sam Mason's comment. If the answer solves your issue, please consider [accepting and/or upvoting](https://stackoverflow.com/help/someone-answers) it so that the community sees it. Feel free to edit it if you see fit. – Jose V Dec 19 '19 at 11:07
  • 1
    @JoseV I've had a fiddle and added a note about using the `tempfile` module. also the [`upload_from_string` method](https://googleapis.dev/python/storage/latest/_modules/google/cloud/storage/blob.html#Blob.upload_from_string) immediately creates a `BytesIO` object so it's better passing a file object if possible, which is now trivial – Sam Mason Dec 19 '19 at 14:50

2 Answers2

3

As suggested in the thread referred by @Sam Mason in a comment, once you have obtained the Pandas datafame, you should use a TextIOWrapper() and BytesIO() as described in the following sample:

The following sample was inspired by @ramhiser's answer in this SO thread

df = query_job.result().to_dataframe()
blob = bucket.blob(f'{subfolder}/{gcs_filename}.gz')

with BytesIO() as gz_buffer:
    with gzip.GzipFile(mode='w', fileobj=gz_buffer) as gz_file:
        df.to_csv(TextIOWrapper(gz_file, 'utf8'), index=False)

    blob.upload_from_file(gz_buffer,
        content_type='application/octet-stream')

also note that if you expect this file to ever get larger than a couple of MB you are probably better off using something from the tempfile module in place of BytesIO. SpooledTemporaryFile is basically designed for this use case, where it will use a memory buffer up to some given size and only use the disk if the file gets really big

Sam Mason
  • 15,216
  • 1
  • 41
  • 60
Jose V
  • 1,356
  • 1
  • 4
  • 12
  • For people facing `ValueError: Stream must be at beginning` use insert gz_buffer.seek(0) before the `blob.upload_from_file(...)` line. – Yoyo Jul 09 '21 at 07:46
0

Hi I tried to reproduce your use case:

  1. I created a cloud function using this quickstart link:

    def hello_world(request):
    
      from google.cloud import bigquery
      from google.cloud import storage 
      import pandas as pd 
    
    
      client = bigquery.Client() 
      storage_client = storage.Client() 
    
      path = '/tmp/file.gz'
    
    
      query_job = client.query("""
      SELECT
      CONCAT(
        'https://stackoverflow.com/questions/',
         CAST(id as STRING)) as url,
      view_count
      FROM `bigquery-public-data.stackoverflow.posts_questions`
      WHERE tags like '%google-bigquery%'
      ORDER BY view_count DESC
      LIMIT 10""")  
    
      results = query_job.result().to_dataframe()
      results.to_csv(path,sep='|',index=False,encoding='utf-8',compression='gzip')
    
      bucket = storage_client.get_bucket('mybucket')  
      blob = bucket.blob('file.gz')
      blob.upload_from_filename(path)
    
    1. This is the requirements.txt:

      # Function dependencies, for example:
      
      google-cloud-bigquery
      google-cloud-storage
      pandas
      
    2. I deployed the function.

    3. I checked the output.

      gsutil cp gs://mybucket/file.gz file.gz
      gzip -d file.gz
      cat file
      
      
      #url|view_count
      https://stackoverflow.com/questions/22879669|52306
      https://stackoverflow.com/questions/13530967|46073
      https://stackoverflow.com/questions/35159967|45991
      https://stackoverflow.com/questions/10604135|45238
      https://stackoverflow.com/questions/16609219|37758
      https://stackoverflow.com/questions/11647201|32963
      https://stackoverflow.com/questions/13221978|32507
      https://stackoverflow.com/questions/27060396|31630
      https://stackoverflow.com/questions/6607552|31487
      https://stackoverflow.com/questions/11057219|29069
      
marian.vladoi
  • 7,663
  • 1
  • 15
  • 29