I'm trying to set spark.sql.parquet.output.committer.class and nothing I do seems to get the setting to take effect.
I'm trying to have many threads write to the same output folder, which would work with org.apache.spark.sql.
parquet.DirectParquetOutputCommitter
since it wouldn't use the _temporary
folder. I'm getting the following error, which is how I know it's not working:
Caused by: java.io.FileNotFoundException: File hdfs://path/to/stuff/_temporary/0/task_201606281757_0048_m_000029/some_dir does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:795)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:853)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:849)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:849)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:382)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
Note the call to org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob
, the default class.
I've tried the following, based on other SO answers and searches:
sc._jsc.hadoopConfiguration().set(key, val)
(this does work for settings likeparquet.enable.summary-metadata
)dataframe.write.option(key, val).parquet
- Adding
--conf "spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter"
to thespark-submit
call - Adding
--conf "spark.sql.parquet.output.committer.class"=" org.apache.spark.sql.parquet.DirectParquetOutputCommitter"
to thespark-submit
call.
That's all I've been able to find, and nothing works. It looks like it's not hard to set in Scala but appears impossible in Python.