3

I have a relatively simply cloud function Gen2, which is deployed using Cloud Run regardless of how many vCPU I assigned, DuckDB seems to be using only 1 CPU ,the Memory works fine, I checked that using The Metrics Dashboard, any idea what's wrong ?

import duckdb
import json
import pandas as pd
def download_all_blobs_with_transfer_manager(
   bucket_name, destination_directory="", threads=4
):

from google.cloud.storage import Client, transfer_manager

storage_client = Client('yyyy')
bucket = storage_client.bucket(bucket_name)

blob_names = [blob.name for blob in bucket.list_blobs()]

results = transfer_manager.download_many_to_path(
    bucket, blob_names, destination_directory=destination_directory, threads=threads
)

for name, result in zip(blob_names, results):
    
    if isinstance(result, Exception):
        print("Failed to download {} due to exception: {}".format(name, result))
    else:
        print("Downloaded {} to {}.".format(name, destination_directory + name))
    download_all_blobs_with_transfer_manager('xxxxx', "./data", threads=8)


     duckdb.query("install httpfs; load httpfs; PRAGMA enable_object_cache ; SET 
           enable_http_metadata_cache=true ")
    def Query(request):
          SQL = request.get_json().get('name')
            try :
              df = duckdb.execute(SQL).df()
            except Exception as er:
              df = pd.DataFrame([{'error':er}])
         return json.dumps(df.to_json(orient="records")), 200, {'Content-Type': 'application/json'}
Mim
  • 999
  • 10
  • 32
  • 1
    Do you expect to process several request on the same Cloud Functions instance? If so, what the generation of CLoud Functions you use? If not, do you want to multi-thread the file processing on Cloud Functions? Can you clarify your expectations? – guillaume blaquiere Feb 13 '23 at 10:45
  • just one request by container, I expect the functions to be multi-threaded – Mim Feb 13 '23 at 11:42
  • 1
    It should work. I'm bad at python, but I already tested the behavior with Go. With 1 CPU or more. – guillaume blaquiere Feb 13 '23 at 12:21

1 Answers1

0

Increase the number of threads used in 'download_all_blobs_with_transfer_manager' function, the number of threads will increase the performance of the function.

Furthermore, try a different method for downloading blobs named 'concurrent.futures' library for launching multiple instances or parallel processing tasks. For reference follow the article concurrent.futures

Code Sample for reference I have added a multiprocessing library with duckdb

import duckdb
import json
import pandas as pd
import multiprocessing

def download_blob(bucket_name, blob_name, destination_directory):
    from google.cloud.storage import Client, transfer_manager
    
    storage_client = Client('yyyy')
    bucket = storage_client.bucket(bucket_name)

    result = transfer_manager.download_blob_to_path(bucket, blob_name, destination_directory + blob_name)

    if isinstance(result, Exception):
        print("Failed to download {} due to exception: {}".format(blob_name, result))
    else:
        print("Downloaded {} to {}.".format(blob_name, destination_directory + blob_name))

def download_all_blobs_with_transfer_manager(bucket_name, destination_directory="", threads=4):
    pool = multiprocessing.Pool(processes=threads)

    storage_client = Client('yyyy')
    bucket = storage_client.bucket(bucket_name)

    blob_names = [blob.name for blob in bucket.list_blobs()]

    results = [pool.apply_async(download_blob, (bucket_name, name, destination_directory)) for name in blob_names]

    for name, result in zip(blob_names, results):
        result.get()

download_all_blobs_with_transfer_manager('xxxxx', "./data", threads=8)

duckdb.query("install httpfs; load httpfs; PRAGMA enable_object_cache ; SET enable_http_metadata_cache=true ")
def Query(request):
    SQL = request.get_json().get('name')
    try:
        df = duckdb.execute(SQL).df()
    except Exception as er:
        ...
Robina Mirbahar
  • 413
  • 1
  • 11