1

I have followed this post Write BigQuery results to GCS in CSV format using Apache Beam

And have created a code to write BigQuery results to GCS in CSV format using Apache Beam. My code is as below

from __future__ import absolute_import
import apache_beam as beam
import argparse
import pickle
import logging
#import sys
#reload(sys)
#sys.setdefaultencoding('utf8')
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from datetime import date
today = date.today()
current_date = today.strftime("%Y%m%d")
def createFile(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    (p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT CUST_ROLE,CUST_ROLE_TXT,CUST_C4C_CUSTID,CUST_FNAME,CUST_LNAME,CUST_EMAIL,CUST_MOBILE,DL_BILL_TO_ADD,DL_SHIP_TO_ADD,CUST_STATE,CUST_GENDER_TXT,CUST_LOCALITY,CUST_GROUP,CUST_GROUP_TXT,CUST_DOB,CUST_SHIP_TO_EMAIL,CUST_CITY,CUST_WEDDING_ANNIVERSARY,CUST_PINCODE,CUST_UUID,CUST_ENTITY_ID,CONSENT_EMAIL,CONSENT_WHATSAPP,CONSENT_MOBILE FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.CONSUMER_EXTRACT`",use_standard_sql=True))
       | 'read values' >> beam.Map(lambda x: x.values())
       | 'CSV format' >> beam.Map(lambda row:'|'.join(['"'+ str(column) +'"' for column in row]))
       | 'Write_to_GCS' >> beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Adobe/Customer_Master'+''+ str(current_date),file_name_suffix='.csv',header='CUST_ROLE|CUST_ROLE_TXT|CUST_C4C_CUSTID|CUST_FNAME|CUST_LNAME|CUST_EMAIL|CUST_MOBILE|DL_BILL_TO_ADD|DL_SHIP_TO_ADD|CUST_STATE|CUST_GENDER_TXT|CUST_LOCALITY|CUST_GROUP|CUST_GROUP_TXT|CUST_DOB|CUST_SHIP_TO_EMAIL|CUST_CITY|CUST_WEDDING_ANNIVERSARY|CUST_PINCODE|CUST_UUID|CUST_ENTITY_ID|CONSENT_EMAIL|CONSENT_WHATSAPP|CONSENT_MOBILE'))
    p.run().wait_until_finish()
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    createFile()

My code works fine and creates a csv file when tested for few records. However, when running for huge data , I get below error

/home/radhika_sharma_ibm/Consumer_Extract.py", line 20, in RuntimeError: UnicodeEncodeError: 'ascii' codec can't encode character u'\xc3' in position 8: ordinal not in range(128) [while running 'CSV format']

Can anyone please tell me how to apply encoding while doing 'read values'. I have tried adding below and have assigned the class in write but the issue is in CSV FORMAT.

from apache_beam.coders.coders import Coder

class CustomCoder(Coder): """A custom coder used for reading and writing strings as UTF-8."""

def encode(self, value):
    return value.encode("utf-8", "replace")

def decode(self, value):
    return value.decode("utf-8", "ignore")

def is_deterministic(self):
    return True

Any suggestions please. I am really struggling with this.

I have also tried using below in read values and csv format, I get a different issue provided below

   | 'read values' >> beam.Map(lambda x: x.values(),encoding='utf-8')
   | 'CSV format' >> beam.Map(lambda row:'|'.join(['"'+ str(column) +'"' for column in row]),encoding='utf-8')


return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 862, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    self.process_method(*args_for_process, **kwargs_for_process),
TypeError: <lambda>() got an unexpected keyword argument 'encoding' [while running 'read values']
 
radhika sharma
  • 499
  • 1
  • 9
  • 28
  • Hi @GuillemXercavins, I read your post about creating a file from big query table and it works well. Thank you. Could you please help me with an issue i am facing with Unicode errors. Guillem Xercavins.send() – radhika sharma Feb 07 '21 at 11:29

0 Answers0