0

I have a Spark job continuously uploading Parquet files to S3 (with partitions).
The files all have the same parquet schema.

One of the field types had been recently changed (from String to long) and so the parquet schema for some of the partitions is mixed.

Places having mixed data from both types now fail to read some of the content.
While it seems I can execute: sqlContext.read.load(path)
when trying to apply any fetch operation on the DataFrame (collect for example), the operation fails with ParquetDecodingException

I intend to migrate the data and re-format it but fail to read the mixed content into a DataFrame.
How can I load the mixed partitions using Apache Spark into DataFrames or any other Spark construct?

Following is the ParquetDecodingException trace:

scala> df.collect
[Stage 1:==============>        (1 + 3) / 4]
WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, 172.1.1.1, executor 0): org.apache.parquet.io.ParquetDecodingException: 
Can not read value at 1 in block 0 in file 
s3a://data/parquet/partition_by_day=20180620/partition_by_hour=10/part-00000-6e4f07e4-3d89-4fad-acdf-37054107dc39.snappy.parquet
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:166)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
Leet-Falcon
  • 2,107
  • 2
  • 15
  • 23

2 Answers2

2

As far as I know you can not mix 2 schema that has the same field with different type. Therefore the only solution I can think of is to:

  1. List files of partition

  2. Re-write each file to a new location and transform the data to the right schame

  3. If the original data was partitioned, another pass is required as to restore partitioning.
    This is because re-writing data file-by-file overrides the partitioning.
  4. Check that you can read all of the new partition as the right schema.
  5. Remove the "bad" partition and copy the tmp partition instead
Leet-Falcon
  • 2,107
  • 2
  • 15
  • 23
Ehud Lev
  • 2,461
  • 26
  • 38
  • Precise answer which proved valid & correct. Added step (4) for a full answer for partitioned data as was in my case. – Leet-Falcon Jul 04 '18 at 21:04
0

There is another idea: instead of changing the type of the existing field (field_string), add a new field of the long type (field_long) and update the code that reads the data to something like this (in pseudocode) and enable schema merging. I believe it it enabled by default but this is a good case to be explicit about it:

sqlContext.read.option("mergeSchema", "true").parquet(<parquet_file>)

...

if isNull(field_long) 
  field_value_long = field_string.value.to_long
else
  field_value_long = field_long.value
Denis Makarenko
  • 2,853
  • 15
  • 29
  • Schema merging has been disabled by default since Spark 1.5. Also `sqlContext` is deprecated, prefer `val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")` when writing new code. – Wade Jensen Jun 23 '18 at 12:14