I have a Python script that is running in AWS Glue. Up until last month is was working fine and I ran into the following error.
I've seen several different nasty hacks suggested on SO and am having issues trying to figure out how to implement correct answers in my script: UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position 13: ordinal not in range(128)
'ascii' codec can't encode character u'\u2013' in position 151: ordinal not in range(128)
. It looks like this character is an em dash
- more than likely was user entered and this script did not take that into account when I wrote it. I've looked up other issues with this on SO but a lot of them involve nasty hacks that I'd rather not implement. My Python is very sparse (glue generates most of the boilerplate). How can I modify my script to negate this encoding error?
import boto3
import sys
from datetime import datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
# @params: [JOB_NAME, database, path, company_id, user_id]
args = getResolvedOptions(
sys.argv, ['JOB_NAME', 'database', 'path', 'company_id', 'user_id'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource = glueContext.create_dynamic_frame.from_catalog(
database=args['database'], table_name="employee_data", transformation_ctx="datasource0")
filtered_data = Filter.apply(
frame=datasource, f=lambda x: x["company_id"] == args['company_id'])
filtered_data = Filter.apply(
frame=filtered_data, f=lambda x: x["email"] == args['user_id'])
def get_diffs(rec):
diff_length = 0
try:
diff_length = len(rec["diffs"])
except:
pass
if diff_length > 0:
for key, value in rec["diffs"].items():
if len(value) > 0:
rec[key] = {}
rec[key]["old"] = value[0]
rec[key]["new"] = value[1]
rec[key]["timestamp"] = rec["timestamp"]
rec[key]["action"] = rec["action"]
rec[key]["source"] = rec["source"]
rec[key]["requester"] = rec["requester"] if rec["requester"] != None else ""
rec[key]["employee"] = rec["email"]
rec[key]["field"] = key
del rec["diffs"]
else:
rec["0"] = {}
rec["0"]["old"] = ""
rec["0"]["new"] = ""
rec["0"]["timestamp"] = rec["timestamp"]
rec["0"]["action"] = rec["action"]
rec["0"]["source"] = rec["source"]
rec["0"]["requester"] = rec["requester"] if rec["requester"] != None else ""
rec["0"]["employee"] = rec["email"]
rec["0"]["field"] = ""
del rec["payload"]
del rec["partition"]
del rec["timestamp"]
del rec["source"]
del rec["action"]
del rec["requester"]
del rec["email"]
del rec["company_id"]
return rec
filtered_data = Map.apply(
frame=filtered_data, f=get_diffs)
new_data_rdd = filtered_data.toDF().rdd
new_data = new_data_rdd.flatMap(
lambda row: (
(
row
)
)
)
new_data = new_data.filter(lambda x: x)
schema = StructType([StructField('action', StringType(), False), StructField('field', StringType(), False), StructField('old', StringType(), False), StructField('employee', StringType(
), False), StructField('source', StringType(), False), StructField('timestamp', StringType(), False), StructField('requester', StringType(), False), StructField('new', StringType(), False)])
datasource0 = glueContext.create_dynamic_frame.from_rdd(
new_data, name='unpivoted', schema=schema)
applymapping1 = ApplyMapping.apply(frame=datasource0, mappings=[("timestamp", "string", "date", "string"), ("employee", "string", "employee", "string"), ("action", "string", "action", "string"), ("field", "string", "employee_field_changed", "string"), (
"old", "string", "previous_value", "string"), ("new", "string", "new_value", "string"), ("source", "string", "source", "string"), ("requester", "string", "changed_by", "string")], transformation_ctx="applymapping1")
repartition = applymapping1.repartition(1)
file_path = "s3://"+args["path"]+"/audit_log/" + \
args["company_id"]+"/"+args["user_id"]
datasink2 = glueContext.write_dynamic_frame.from_options(frame=repartition, connection_type="s3", connection_options={
"path": file_path}, format="csv", transformation_ctx="datasink2")
client = boto3.client('s3')
prefix = "audit_log/"+args["company_id"]+"/"+args["user_id"]
audit_key = prefix+"/audit.csv"
client.delete_object(Bucket=args["path"], Key=audit_key)
response = client.list_objects(Bucket=args["path"], Prefix=prefix)
name = response["Contents"][0]["Key"]
client.copy_object(Bucket=args["path"],
CopySource=args["path"] + "/" + name, Key=audit_key)
client.delete_object(Bucket=args["path"], Key=name)
job.commit()
Here is the traceback
Log Contents:
Traceback (most recent call last):
File "runscript.py", line 63, in <module>
default_error_msg = "
{}
:
{}
".format(e_type.__name__, str(e_value).split("\n")[0])
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2013' in position 151: ordinal not in range(128)