I created a custom ParquetOutputFormat
(class in org.apache.parquet.hadoop
) to overwrite the getRecordWriter
method. Inside the getRecordWriter
method it access CodecFactory
which is causing an IllegalAccessError
. To attempt to fix the issue I tried creating my own class loader, but this did not help. I followed this blog post http://techblog.applift.com/upgrading-spark#advanced-case-parquet-writer
Before I created the custom class loader I was using the CustomParquetOutputFormat
as following:
override def createOutputFormat: OutputFormat[Void, InternalRow] with Ext = new CustomParquetOutputFormat[InternalRow]() with Ext {
...
}
The issue happens when CustomParquetOutputFormat
tries to access CodecFactory
on line 274 when getRecordWriter
is called:
CodecFactory codecFactory = new CodecFactory(conf);
(This is line 274 of ParquetOutputFormat which CustomParquetOutputFormat access)
CodecFactory
is package-private.
Custom Class Loader:
class CustomClassLoader(urls: Array[URL], parent: ClassLoader, whiteList: List[String])
extends ChildFirstURLClassLoader(urls, parent) {
override def loadClass(name: String) = {
if (whiteList.exists(name.startsWith)) {
super.loadClass(name)
} else {
parent.loadClass(name)
}
}
}
Usage:
val sc: SparkContext = SparkContext.getOrCreate()
val cl: CustomClassLoader = new CustomClassLoader(sc.jars.map(new URL(_)).toArray,
Thread.currentThread.getContextClassLoader, List(
"org.apache.parquet.hadoop.CustomParquetOutputFormat",
"org.apache.parquet.hadoop.CodecFactory",
"org.apache.parquet.hadoop.ParquetFileWriter",
"org.apache.parquet.hadoop.ParquetRecordWriter",
"org.apache.parquet.hadoop.InternalParquetRecordWriter",
"org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
"org.apache.parquet.hadoop.MemoryManager"
))
cl.loadClass("org.apache.parquet.hadoop.CustomParquetOutputFormat")
.getConstructor(classOf[String], classOf[TaskAttemptContext])
.newInstance(fullPathWithoutExt, taskAttemptContext)
.asInstanceOf[OutputFormat[Void, InternalRow] with ProvidesExtension]
Error:
java.lang.IllegalAccessError: tried to access class org.apache.parquet.hadoop.CodecFactory from class org.apache.parquet.hadoop.customParquetOutputFormat
at org.apache.parquet.hadoop.CustomParquetOutputFormat.getRecordWriter(CustomParquetOutputFormat.scala:40)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
at org.apache.spark.custom.hadoop.HadoopWriter.<init>(HadoopWriter.scala:35)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriter.<init>(ParquetWriter.scala:16)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriterFactory.createWriter(ParquetWriterFactory.scala:71)
at com.abden.custom.index.IndexBuilder$$anonfun$4.apply(IndexBuilder.scala:55)
at com.abden.custom.index.IndexBuilder$$anonfun$4.apply(IndexBuilder.scala:54)
at scala.collection.immutable.Stream.map(Stream.scala:418)
at com.abden.custom.index.IndexBuilder.generateTiles(IndexBuilder.scala:54)
at com.abden.custom.index.IndexBuilder.generateLayer(IndexBuilder.scala:155)
at com.abden.custom.index.IndexBuilder.appendLayer(IndexBuilder.scala:184)
at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1$$anonfun$apply$1.apply(IndexBuilder.scala:213)
at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1$$anonfun$apply$1.apply(IndexBuilder.scala:210)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at com.abden.custom.util.SplittingByKeyIterator.foreach(SplittingByKeyIterator.scala:3)
at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1.apply(IndexBuilder.scala:210)
at com.abden.custom.index.IndexBuilder$$anonfun$appendLayers$1.apply(IndexBuilder.scala:209)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
The error happens at this line in getRecordWriter
:
val codecFactory = new CodecFactory(conf)
CodecFactory
has no modifier so it is restricted to its packages. Even with the dynamic class loader to load all the classes from the same class loader I still get the IllegalAccessError