11

Output files generated via the Spark SQL DataFrame.write() method begin with the "part" basename prefix. e.g.

DataFrame sample_07 = hiveContext.table("sample_07");
sample_07.write().parquet("sample_07_parquet");

Results in:

hdfs dfs -ls sample_07_parquet/                                                                                                                                                             
Found 4 items
-rw-r--r--   1 rob rob          0 2016-03-19 16:40 sample_07_parquet/_SUCCESS
-rw-r--r--   1 rob rob        491 2016-03-19 16:40 sample_07_parquet/_common_metadata
-rw-r--r--   1 rob rob       1025 2016-03-19 16:40 sample_07_parquet/_metadata
-rw-r--r--   1 rob rob      17194 2016-03-19 16:40 sample_07_parquet/part-r-00000-cefb2ac6-9f44-4ce4-93d9-8e7de3f2cb92.gz.parquet

I would like to change the output filename prefix used when creating a file using Spark SQL DataFrame.write(). I tried setting the "mapreduce.output.basename" property on the hadoop configuration for the Spark context. e.g.

public class MyJavaSparkSQL {

  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setAppName("MyJavaSparkSQL");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    ctx.hadoopConfiguration().set("mapreduce.output.basename", "myprefix");
    HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(ctx.sc());
    DataFrame sample_07 = hiveContext.table("sample_07");
    sample_07.write().parquet("sample_07_parquet");
    ctx.stop();
  }

That did not change the output filename prefix for the generated files.

Is there a way to override the output filename prefix when using the DataFrame.write() method?

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Rob
  • 113
  • 1
  • 1
  • 4

3 Answers3

13

You cannot change the "part" prefix while using any of the standard output formats (like Parquet). See this snippet from ParquetRelation source code:

private val recordWriter: RecordWriter[Void, InternalRow] = {
  val outputFormat = {
    new ParquetOutputFormat[InternalRow]() {
      // ...
      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
        // ..
        //  prefix is hard-coded here:
        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
    }
  }
}

If you really must control the part file names, you'll probably have to implement a custom FileOutputFormat and use one of Spark's save methods that accept a FileOutputFormat class (e.g. saveAsHadoopFile).

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
1

Assuming that the output folder have only one csv file in it, we can rename this grammatically (or dynamically) using the below code. In the below code (last line), get all files from the output directory with csv type and rename that to a desired file name.

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val outputfolder_Path = "s3://<s3_AccessKey>:<s3_Securitykey>@<external_bucket>/<path>"     
val fs = FileSystem.get(new java.net.URI(outputfolder_Path), new Configuration())   
fs.globStatus(new Path(outputfolder_Path + "/*.*")).filter(_.getPath.toString.split("/").last.split("\\.").last == "csv").foreach{l=>{ fs.rename(new Path(l.getPath.toString), new Path(outputfolder_Path + "/DesiredFilename.csv")) }}
Sarath Subramanian
  • 20,027
  • 11
  • 82
  • 86
0

Agree with @Tzach Zohar..


After saving your dataframe in to HDFS or S3 you can rename using below...

The below scala example is ready to eat :-) means you can directly use in your code or util After writing in to HDFS or S3 you can rename files using the below def..

#Brief :

1) get all the files under a folder using globstatus.
2) loop through and rename the file with prefix or suffix what ever is your case.
Note : Apache Commons are already available in hadoop clusters so no need for any further dependencies.

/**
   * prefixHdfsFiles
   * @param outputfolder_Path
   * @param prefix
   */
  def prefixHdfsFiles(outputfolder_Path: String, prefix: String) = {
    import org.apache.hadoop.fs.{_}
    import org.apache.hadoop.conf.Configuration
    import org.apache.commons.io.FilenameUtils._
    import java.io.File
    import java.net.URI

    val fs = FileSystem.get(new URI(outputfolder_Path), new Configuration())
    fs.globStatus(
      new Path(outputfolder_Path + "/*.*")).foreach { l: FileStatus => {
      val newhdfsfileName = new Path(getFullPathNoEndSeparator(l.getPath.toString) + File.separatorChar + prefix + getName(l.getPath.toString))
     // fs.rename(new Path(l.getPath.toString),newhdfsfileName )
      val change = s"""
        |original ${ new Path(l.getPath.toString) } --> new $newhdfsfileName
        |""".stripMargin
      println( change)
    }
    }
  }

Caller would be for example :


val outputfolder_Path = "/a/b/c/d/e/f/"
    prefixHdfsFiles(outputfolder_Path, "myprefix_")
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121