2

Within my Google Cloud Function (Python 3.7 Runtime) I have created a function that is trying to download all .csv files from a google storage bucket into a pandas dataframe (df). Once in a dataframe, I was going to do some light ETL work to it then convert it back one large .csv file to save to another bucket.
The problem I'm getting is when I get to the point of reading in the objects (converted to strings using file.download_as_string()) into read_csv(), I get an error related to IO.StringIO (TypeError: initial_value must be str or None, not bytes)

Within the read_csv(io.String.IO(file_contents)....), Does this have something to do with where I have placed the io.StringIO method? Can anyone help me correct this error?

    def stage1slemonthly(data, context, source_bucket = 'my_source_bucket', 
    destination_bucket = 'gs://my destination_bucket'):  


        from google.cloud import storage
        import pandas as pd
        import pyspark
        from pyspark.sql import SQLContext
        import io

        storage_client = storage.Client()

        # source_bucket = data['bucket']
        # source_file = data['name']
        source_bucket = storage_client.bucket(source_bucket)

        # load in the col names
        col_names = ["Customer_Country_Number", "Customer_Name", "Exclude",
             "SAP_Product_Name", "CP_Sku_Code", "Exclude", "UPC_Unit",
             "UPC_Case", "Colgate_Month_Year", "Total_Cases",
             "Promoted_Cases", "Non_Promoted_Cases",
             "Planned_Non_Promoted_Cases", "Exclude",
             "Lead_Measure", "Tons", "Pieces", "Liters",
             "Tons_Conv_Revenue", "Volume_POS_Units", "Scan_Volume",
             "WWhdrl_Volume", "Exclude", "Exclude", "Exclude", "Exclude",
             "Exclude", "Exclude", "Exclude", "Exclude", "Investment_Buy",
             "Exclude", "Exclude", "Gross_Sales", "Claim_Sales",
             "Adjusted_Gross_Sales", "Exclude", "Exclude",
             "Consumer_Investment", "Consumer_Allowance",
             "Special_Pack_FG", "Coupons", "Contests_Offers", 
             "Consumer_Price_Reduction", "Permanent_Price_Reduction",
             "Temporary_Price_Reduction", "TPR_Off_Invoice", "TPR_Scan",
             "TPR_WWdrwl_Exfact", "Every_Day_Low_Price", "Closeouts",
             "Inventory_Price_Reduction", "Exclude", "Customer_Investment",
             "Prompt_Payment", "Efficiency_Drivers", "Efficient_Logistics",
             "Efficient_Management", "Business_Builders_Direct", "Assortment",
             "Customer_Promotions","Customer_Promotions_Terms",
             "Customer_Promotions_Fixed", "Growth_Direct",
             "New_Product_Incentives", "Free_Goods_Direct",
             "Shopper_Marketing", "Business_Builders_Indirect",
             "Middleman_Performance", "Middleman_Infrastructure",
             "Growth_Indirect", "Indirect_Retailer_Investments",
             "Free_Goods_Indirect", "Other_Customer_Investments",
             "Product_Listing_Allowances", "Non_Performance_Trade_Payments",
             "Exclude", "Variable_Rebate_Adjustment", 
             "Overlapping_OI_Adjustment", "Fixed_Accruals",
             "Variable_Accruals", "Total_Accruals", "Gross_To_Net",
             "Invoiced_Sales", "Exclude", "Exclude", "Net_Sales",
             "Exclude", "Exclude", "Exclude", "Exclude", "Exclude",
             "Exclude", "Exclude", "Exclude", "Exclude",
             "Total_Variable_Cost", "Margin", "Exclude"]

        df = pd.DataFrame(columns=[col_names])

        for file in list(source_bucket.list_blobs()):
          file_contents = file.download_as_string() 
          df = df.append(pd.read_csv(io.StringIO(file_contents), header=None, names=[col_names]))

        df = df.reset_index(drop=True)

        # do ETL work here in future

        sc = pyspark.SparkContext.getOrCreate()
        sqlCtx = SQLContext(sc)
        sparkDf = sqlCtx.createDataFrame(df)
        sparkDf.coalesce(1).write.option("header", "true").csv(destination_bucket)

When I run it, I get the following error message...

Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", line 383, in run_background_function _function_handler.invoke_user_function(event_object) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", line 214, in call_user_function event_context.Context(**request_or_event.context)) File "/user_code/main.py", line 56, in stage1slemonthly df = df.append(pd.read_csv(io.StringIO(file_contents), header=None, names=[col_names])) TypeError: initial_value must be str or None, not bytes

tawab_shakeel
  • 3,701
  • 10
  • 26
jwlon81
  • 339
  • 3
  • 15

1 Answers1

7

You get this error because file.download_as_string() return type is bytes and not str, and you cannot use io.StringIO with a bytes parameter (initial_value=file_contents).

Moreover, col_names is defined as an array here, so writing pd.DataFrame(columns=[col_names]) and pd.read_csv(..., names=[col_names]) is incorrect : you should use col_names instead of [col_names].

Anyway, it does not seem to be the right way to read a CSV file from Google Cloud Storage. You'd rather write :

from google.cloud import storage
import pandas as pd
import io

storage_client = storage.Client()

source_bucket = storage_client.bucket(source_bucket)

col_names = ["Customer_Country_Number", "Customer_Name", ...]

df = pd.DataFrame(columns=col_names)

for file in list(source_bucket.list_blobs()):
    file_path="gs://{}/{}".format(file.bucket.name, file.name)
    df = df.append(pd.read_csv(file_path, header=None, names=col_names))

# the rest of your code

Indeed, you can read files directly from GCS with read_csv method of pandas instead of downloading the file to load it, but you need to install gcsfs (pip3 install gcsfs) first.

norbjd
  • 10,166
  • 4
  • 45
  • 80
  • Thankyou for the answer above - the code runs through without any errors and produces a csv output into a new gcs bucket as expected. Unfortunately the problem I have now is that the csv data from the original 'source bucket' is not pulling through any actual data into the pandas df. Could this have something to do with how pd.read_csv is reading the file_path? – jwlon81 Jul 02 '19 at 06:54
  • Within my code, to ensure that the issue wasn't related to the way the data was outputting from pandas df to csv (and then into google cloud storage), I created an additional random df and appended this to the df created by read_csv- the csv outputted perfectly with only the randomly generated df. This implies that it has something to do with how my code is reading in the .csv files at the beggining. – jwlon81 Jul 02 '19 at 07:01
  • I am not sure I understand. For example, if you have 2 csv files in `source_bucket` (one line per file), are you saying that only the line of the last loaded csv file is in the `df` (after the `for` loop)? – norbjd Jul 02 '19 at 10:39
  • Sorry, I didn't explain my new problem that well. To illustrate my problem, within my google storage bucket `source_bucket` , I have 6 csv files with the same structure that I'm trying to read and append into `df = df.append(pd.read_csv(file_path, header=None, names=col_names))` , in order to get one large df. Using pyspark, I write this large df to a .csv file and upload to `destination_bucket ` . The code executes with no errors but when I open up the newly created .csv file in my `destination_bucket ` , there are no rows of data in there (on the column headers `col_names ` ) – jwlon81 Jul 02 '19 at 22:46
  • My theory is that my problem either stems from how the data is being read within the FOR loop OR could it have something to do with the 6 .csv files from source bucket? I read that any .csv files that have been opened and saved with microsoft excel have caused issues in the past? Would be interesting to hear your thoughts – jwlon81 Jul 02 '19 at 22:51
  • It may be a problem with the code **after** the `for` loop. I've tested the code in my answer with 3 csv files (not edited with Excel) with 2 columns on GCS and it works (printing `df` prints the contents of my 3 csv files). I strongly suggest you to open a new question with the details you have provided here + a code snippet and I'll be glad to help if I can. – norbjd Jul 03 '19 at 08:32
  • 1
    norbjd - I went through your original response to my question and noticed that I forgot to take off io.StringIO from my FOR loop. Have removed it and code is now working perfectly. thanks! – jwlon81 Jul 04 '19 at 04:01