I have followed this post Write BigQuery results to GCS in CSV format using Apache Beam
And have created a code to write BigQuery results to GCS in CSV format using Apache Beam. My code is as below
from __future__ import absolute_import
import apache_beam as beam
import argparse
import pickle
import logging
#import sys
#reload(sys)
#sys.setdefaultencoding('utf8')
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
from datetime import date
today = date.today()
current_date = today.strftime("%Y%m%d")
def createFile(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query="SELECT CUST_ROLE,CUST_ROLE_TXT,CUST_C4C_CUSTID,CUST_FNAME,CUST_LNAME,CUST_EMAIL,CUST_MOBILE,DL_BILL_TO_ADD,DL_SHIP_TO_ADD,CUST_STATE,CUST_GENDER_TXT,CUST_LOCALITY,CUST_GROUP,CUST_GROUP_TXT,CUST_DOB,CUST_SHIP_TO_EMAIL,CUST_CITY,CUST_WEDDING_ANNIVERSARY,CUST_PINCODE,CUST_UUID,CUST_ENTITY_ID,CONSENT_EMAIL,CONSENT_WHATSAPP,CONSENT_MOBILE FROM `whr-asia-datalake-nonprod.WHR_DATALAKE.CONSUMER_EXTRACT`",use_standard_sql=True))
| 'read values' >> beam.Map(lambda x: x.values())
| 'CSV format' >> beam.Map(lambda row:'|'.join(['"'+ str(column) +'"' for column in row]))
| 'Write_to_GCS' >> beam.io.WriteToText('gs://whr-asia-datalake-dev-standard/outbound/Adobe/Customer_Master'+''+ str(current_date),file_name_suffix='.csv',header='CUST_ROLE|CUST_ROLE_TXT|CUST_C4C_CUSTID|CUST_FNAME|CUST_LNAME|CUST_EMAIL|CUST_MOBILE|DL_BILL_TO_ADD|DL_SHIP_TO_ADD|CUST_STATE|CUST_GENDER_TXT|CUST_LOCALITY|CUST_GROUP|CUST_GROUP_TXT|CUST_DOB|CUST_SHIP_TO_EMAIL|CUST_CITY|CUST_WEDDING_ANNIVERSARY|CUST_PINCODE|CUST_UUID|CUST_ENTITY_ID|CONSENT_EMAIL|CONSENT_WHATSAPP|CONSENT_MOBILE'))
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
createFile()
My code works fine and creates a csv file when tested for few records. However, when running for huge data , I get below error
/home/radhika_sharma_ibm/Consumer_Extract.py", line 20, in RuntimeError: UnicodeEncodeError: 'ascii' codec can't encode character u'\xc3' in position 8: ordinal not in range(128) [while running 'CSV format']
Can anyone please tell me how to apply encoding while doing 'read values'. I have tried adding below and have assigned the class in write but the issue is in CSV FORMAT.
from apache_beam.coders.coders import Coder
class CustomCoder(Coder): """A custom coder used for reading and writing strings as UTF-8."""
def encode(self, value):
return value.encode("utf-8", "replace")
def decode(self, value):
return value.decode("utf-8", "ignore")
def is_deterministic(self):
return True
Any suggestions please. I am really struggling with this.
I have also tried using below in read values and csv format, I get a different issue provided below
| 'read values' >> beam.Map(lambda x: x.values(),encoding='utf-8')
| 'CSV format' >> beam.Map(lambda row:'|'.join(['"'+ str(column) +'"' for column in row]),encoding='utf-8')
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
File "apache_beam/runners/common.py", line 862, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
TypeError: <lambda>() got an unexpected keyword argument 'encoding' [while running 'read values']