0

I am trying to read multiple parquet files from a s3 bucket containing several days worth of data.

s3 path : s3n://<s3path>/dt=*/*.snappy.parquet

Pyspark code to read data from multiple parquet files:

s="SELECT * FROM parquet.`s3n://<s3path>/dt=*/*.snappy.parquet`"
c = sqlContext.sql(s)
c.show(2)

Error is :

[Stage 3:> (0 + 3) /  java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
  at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
  at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Working code to read a single file from a s3 bucket:

s2="SELECT * FROM parquet.`s3n://<s3path>/dt=2016-02-02/*.snappy.parquet`"
c2 = sqlContext.sql(s2)
c2.show(2)


+------+--------+---------------+-------------+-----------+
|CL1   |CL2     |CL3            |CL3          |  r_CD     |
+------+--------+---------------+-------------+------------
|   18 |YGh4c   |     2016-02-02|     00:32:02|        AC |        
|   18 |YGh4c   |     2016-02-02|     00:32:02|        IC |       
+------+--------+---------------+-------------+------------

I am able to read the files in the bucket individually using the same pyspark command. Why does scanning the whole bucket gives an error? What is the correct way to do this?

Stefan Crain
  • 2,010
  • 3
  • 21
  • 22
NewBee
  • 21
  • 1
  • 2
  • 8
  • Do all the parquet files have the same schema definition and data types ? You typically see this error if one of the parquets files either has an additional column or a different data type (e.g. string instead of integer). – Alex Mar 30 '18 at 15:39
  • yes all the parquet files have same schema ,and i am able select individual columns like this : c.select('col1').show() . But when I try to select another column 'col2' in the same way from the bucket it gives me an error as above and when selected only from one date from the bucket it does not give an error . – NewBee Mar 30 '18 at 16:33

0 Answers0