Im struggling with what supposed to be a simple task (maybe i do not understand whats under the hood in the glue data catalog).
- I have an S3 bucket with parquet files
- I ran a glue crawler through it to create a catalogue table so i could query it in Athena, etc...
- Im trying to create a glue job that retreives the data from that table (catalogue table), partitiones is, and saves in another S3 bucket. I receive this exception every time i use the catalogue as source:
Py4JJavaError: An error occurred while calling o497.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 29 in stage 39.0 failed 4 times, most recent failure: Lost task 29.3 in stage 39.0 (TID 437, 172.34.18.186, executor 1): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:57)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToDouble(ParquetDictionary.java:46)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getDouble(OnHeapColumnVector.java:460)
...blabla
However, if i use the initial S3 bucket as source, it works. I pulled the data from both sources and compared schemas (they are the same, at least on the surface).
Here is my test glue job code:
import sys
import pandas as pd
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as f
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructField, StructType, StringType,LongType
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
## Read the data
from_catalogue = glueContext.create_dynamic_frame.from_catalog(database = "ss-demo-database", table_name = "tlc_green_data", transformation_ctx = "datasource0")
from_s3 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": ["s3://test/tlc_green_data/green_tripdata_2014-02.parquet"],
"recurse": False,
},
transformation_ctx="from_s3",
)
from_s3.printSchema()
from_catalogue.printSchema()
S3_location = "s3://test/demo/"
# Result are store back to the S3 bucket (this works if from s3, doesnt work if from catalogue)
datasink = glueContext.write_dynamic_frame_from_options(
frame=from_catalogue,
connection_type="s3",
connection_options={
"path": S3_location
},
format="parquet",
format_options = {"compression": "snappy"},
transformation_ctx ="datasink")
job.commit()
I tried converting to spark df and with overwrite option - same thing I tried only selecting 10 rows - same thing
Your help is greatly appreciated!