0

Im struggling with what supposed to be a simple task (maybe i do not understand whats under the hood in the glue data catalog).

  1. I have an S3 bucket with parquet files
  2. I ran a glue crawler through it to create a catalogue table so i could query it in Athena, etc...
  3. Im trying to create a glue job that retreives the data from that table (catalogue table), partitiones is, and saves in another S3 bucket. I receive this exception every time i use the catalogue as source:
Py4JJavaError: An error occurred while calling o497.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

...

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 29 in stage 39.0 failed 4 times, most recent failure: Lost task 29.3 in stage 39.0 (TID 437, 172.34.18.186, executor 1): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
    at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:57)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToDouble(ParquetDictionary.java:46)
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getDouble(OnHeapColumnVector.java:460)

...blabla

However, if i use the initial S3 bucket as source, it works. I pulled the data from both sources and compared schemas (they are the same, at least on the surface).

Here is my test glue job code:

import sys
import pandas as pd
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as f
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructField, StructType, StringType,LongType

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


## Read the data
from_catalogue = glueContext.create_dynamic_frame.from_catalog(database = "ss-demo-database", table_name = "tlc_green_data", transformation_ctx = "datasource0")



from_s3 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": ["s3://test/tlc_green_data/green_tripdata_2014-02.parquet"],
        "recurse": False,
    },
    transformation_ctx="from_s3",
)


from_s3.printSchema()

from_catalogue.printSchema()


S3_location = "s3://test/demo/"

# Result are store back to the S3 bucket (this works if from s3, doesnt work if from catalogue)
datasink = glueContext.write_dynamic_frame_from_options(
               frame=from_catalogue,
               connection_type="s3",
               connection_options={
                  "path": S3_location
               },
               format="parquet",
               format_options = {"compression": "snappy"},
               transformation_ctx ="datasink")

job.commit()

I tried converting to spark df and with overwrite option - same thing I tried only selecting 10 rows - same thing

Your help is greatly appreciated!

Ronak Jain
  • 3,073
  • 1
  • 11
  • 17
Nikita Voevodin
  • 137
  • 2
  • 14
  • Take a look [here](https://stackoverflow.com/questions/73032693/spark-error-parquet-column-values-dictionary-plainvaluesdictionaryplainintege), it is a datatype issue which is explained in the answer of the question. – ms12 Jan 09 '23 at 18:33
  • I looked there. The schemas are the same. Can you please be more specific? – Nikita Voevodin Jan 09 '23 at 19:36
  • The schema is the same (when looking at the data), but Glue catalog isn't mapping it correctly. If you look at the linked example, OP faced similar issue, but with an bigint type. So how I understand it, you need to either deliberately correctly cast all of your columns (which may raise this issue) when writing a source table, and recrawl the data. – ms12 Jan 09 '23 at 19:55
  • Will see what i can do but i would imagine it to be a more widespread issue. I even reduced the df to just one column and cast it to string- same error when trying to write. As if it keeps original schema and tries to match it. – Nikita Voevodin Jan 09 '23 at 20:49

0 Answers0