0

error logs I'm getting error while running auto created script in aws glue.As data is struct format and storing/alter in same table that created previously.

py4j.protocol.Py4JJavaError: An error occurred while calling o133.pyWriteDynamicFrame. : com.amazonaws.services.glue.util.SchemaException: Cannot write struct field tags to CSV

Please guide me where i made mistake

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "experimentdb", table_name = "datalakexperiment", 
transformation_ctx = 
"datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"experimentdb", table_name = 
"datalakexperiment", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "string", "id", "string"), ("identifier", 
"string", "identifier", 
"string"), ("session_count", "long", "session_count", "long"), ("language", 
"string", "language", 
"string"), ("timezone", "long", "timezone", "long"), ("game_version", 
"string", "game_version", 
"string"), ("device_os", "string", "device_os", "string"), ("device_type", 
"long", "device_type", 
"long"), ("device_model", "string", "device_model", "string"), ("ad_id", 
"string", "ad_id", 
"string"), ("tags.phone_number", "string", "tags.phone_number", "string"), 
("tags.real_name", 
"string", "tags.real_name", "string"), ("tags.email", "string", 
"tags.email", 
"string"), 
("tags.onboardingStatus", "string", "tags.onboardingStatus", "string"), 
("tags.dfuStatus", "string", 
"tags.dfuStatus", "string"), ("tags.activityStatus", "string", 
"tags.activityStatus", "string"), 
("tags.lastOperationPerformed", "string", "tags.lastOperationPerformed", 
"string"), ("last_active", 
"string", "last_active", "string"), ("playtime", "long", "playtime", 
"long"), 
("amount_spent", 
"double", "amount_spent", "double"), ("created_at", "string", "created_at", 
"string"), 
("invalid_identifier", "string", "invalid_identifier", "string"), 
("badge_count", "long", 
"badge_count", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", 
"string", "id", "string"), 
("identifier", "string", "identifier", "string"), ("session_count", "long", 
"session_count", "long"), 
("language", "string", "language", "string"), ("timezone", "long", 
"timezone", "long"), 
("game_version", "string", "game_version", "string"), ("device_os", 
"string", 
"device_os", "string"), 
("device_type", "long", "device_type", "long"), ("device_model", "string", 
"device_model", "string"), 
("ad_id", "string", "ad_id", "string"), ("tags.phone_number", "string", 
"tags.phone_number", 
"string"), ("tags.real_name", "string", "tags.real_name", "string"), 
("tags.email", "string", 
"tags.email", "string"), ("tags.onboardingStatus", "string", 
"tags.onboardingStatus", "string"), 
("tags.dfuStatus", "string", "tags.dfuStatus", "string"), 
("tags.activityStatus", "string", 
"tags.activityStatus", "string"), ("tags.lastOperationPerformed", "string", 
"tags.lastOperationPerformed", "string"), ("last_active", "string", 
"last_active", "string"), 
("playtime", "long", "playtime", "long"), ("amount_spent", "double", 
"amount_spent", "double"), 
("created_at", "string", "created_at", "string"), ("invalid_identifier", 
"string", 
"invalid_identifier", 
"string"), ("badge_count", "long", "badge_count", "long")], 
transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["id", "identifier", "session_count", "language", 
"timezone", "game_version", 
"device_os", "device_type", "device_model", "ad_id", "tags", "last_active", 
"playtime", 
"amount_spent", "created_at", "invalid_identifier", "badge_count"], 
transformation_ctx = 
"selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["id", 
"identifier", 
"session_count", "language", "timezone", "game_version", "device_os", 
"device_type", "device_model", 
"ad_id", "tags", "last_active", "playtime", "amount_spent", "created_at", 
"invalid_identifier", 
"badge_count"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "experimentdb", table_name 
= 
"datalakexperiment", 
transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = 
"MATCH_CATALOG", database = 
"experimentdb", table_name = "datalakexperiment", transformation_ctx = 
"resolvechoice3")
## @type: DataSink
## @args: [database = "experimentdb", table_name = "datalakexperiment", 
transformation_ctx = 
"datasink4"]
## @return: datasink4
## @inputs: [frame = resolvechoice3]
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame = 
resolvechoice3, database = 
"experimentdb", table_name = "datalakexperiment", transformation_ctx = 
"datasink4")
job.commit()
Parag Shahade
  • 57
  • 3
  • 8

1 Answers1

0

You have a struct object somewhere in your table schema. CSV cannot support nested structs. So you can either

  1. Use a hierarchical output format, like JSON
  2. flatten out the struct, (more below)

So there are solutions here for flattening structs: How to flatten a struct in a Spark dataframe?

Here is my implementation:

    def flatten(schema, prefix=None):
        """Flattens out nested schema as CSV doesn't support nesting
        NOTE: If different nested schemas have same named columns,the last one found will overwrite any earlier instances of that column"""
        fields = []
        for field in schema.fields:
            name = f"{prefix}.{field.name}" if prefix else field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType
            if isinstance(dtype, StructType):
                fields += flatten(dtype, prefix=name)
            else:
                fields.append(name)
        return fields

Then just before you output invoke this on your dataframe something like:

#in your imports:
from pyspark.context import SparkContext
from awsglue.context import GlueContext

#in your process:
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
resolvechoice3 = resolvechoice3.toDF()#convert to data frame
resolvechoice3 = resolvechoice3.select(flatten(resolvechoice3.schema))#flatten
resolvechoice3 = DynamicFrame.fromDF(resolvechoice3, glue_context, "final_convert")#convert back to dynamic frame

#and then output as usual
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame = 
resolvechoice3, database = 
"experimentdb", table_name = "datalakexperiment", transformation_ctx = 
"datasink4")
job.commit()
jonlegend
  • 361
  • 2
  • 6
  • I have added as mentioned .Getting error as ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last): File "/tmp/testing", line 52, in resolvechoice3 = resolvechoice3.select(flatten(resolvechoice3.schema)) AttributeError: 'DynamicFrame' object has no attribute 'selec – Parag Shahade May 14 '21 at 08:33
  • I made an update to convert to / from dataframe, as that is needed for the select to work. – jonlegend May 14 '21 at 17:22
  • In case someone comes here looking for answers: I had a similar issue but already using the function given by @ParagShahade. The error was due to two things: (I) My messy columns had dots in the names but were not nested, for Spark to take the literal name add '`' in the ends, (ii) even though applymapping changes names with dots on containers locally, on AWS it doesn't so I had to convert to DF and make the changes with Spark syntax, not buildings from glue – Andrés Tello Urrea Feb 24 '23 at 15:55
  • Additionals: for the "flatten" function to work directly on DynamicFrames just add "gluetypes.ArrayType" and "gluetypes.StructType" inside "isinstance". – Andrés Tello Urrea Feb 24 '23 at 15:58