30

I have multiple jobs that I want to execute in parallel that append daily data into the same path using partitioning.

e.g.

dataFrame.write().
         partitionBy("eventDate", "category")
            .mode(Append)
            .parquet("s3://bucket/save/path");

Job 1 - category = "billing_events" Job 2 - category = "click_events"

Both of these jobs will truncate any existing partitions that exist in the s3 bucket prior to execution and then save the resulting parquet files to their respective partitions.

i.e.

job 1 - > s3://bucket/save/path/eventDate=20160101/channel=billing_events

job 2 - > s3://bucket/save/path/eventDate=20160101/channel=click_events

The problem im facing is the temporary files that get created during the job execution by spark. It saves the working out files to the base path

s3://bucket/save/path/_temporary/...

so both jobs end up sharing the same temp folder and cause conflict, which ive noticed can cause one job to delete temp files, and the other job fail with a 404 from s3 saying an expected temp file doesnt exist.

Has anyone faced this issue and come up with a strategy to have parallel execution of jobs in the same base path?

im using spark 1.6.0 for now

vcetinick
  • 1,957
  • 1
  • 19
  • 41
  • 1
    you could use direct output committer since it doesn't use a temp folder there will be no conflict. – Tal Joffe Aug 23 '16 at 11:57
  • Is there any risk with the direct output committer writing data to the bucket and then jobs fail and leave partial data in the s3 that never gets cleaned up? – vcetinick Aug 25 '16 at 21:11
  • 1
    yes definitely. since the commit to the file system is done on the task level (i.e for each output file) you could partial data. The way we solve this is to write (directly) to a temp folder and copy it the final destination after Spark job finishes (using a S3DistCp step). I looked for a better way to do this but could find it – Tal Joffe Aug 28 '16 at 06:06
  • what a great question - I was about to embark on something similar to this but had not considered the shared `_temporary` folder (what a bad idea btw..) – WestCoastProjects Jun 27 '18 at 05:57
  • 1
    Is there any clean solution to this problem? I checked the vcetinick solution which will definitely work but do we have a solution from spark in latest versions? – Devavrata Dec 16 '19 at 09:57
  • 1
    @TalJoffe I am also facing similar issue spark (Aws EMR) i a m using hadoop 2.8.5 and spark 2.4.Do we have any clean solution yet ? – Vikram Ranabhatt Jan 10 '20 at 16:28

4 Answers4

27

So after much reading about how to tackle this problem I thought id transfer some wisdom back here to wrap things up. Thanks mostly to Tal's comments.

I've additionally found that writing directly to the s3://bucket/save/path seems dangerous because if a job is killed and the cleanup of the temporary folder doesnt happen at the end of the job, it seems like its left there for the next job and i've noticed sometimes the previous killed jobs temp's files land in the s3://bucket/save/path and causes duplication... Totally unreliable...

Additionally, the rename operation of the _temporary folder files to their appropriate s3 files, takes a horrendous amount of time (approx 1 sec per file) as S3 only supports copy/delete not rename. Additionally, only the driver instance renames these files using a single thread so as much as 1/5 of some jobs with large numbers of files/partitions are spent just waiting for rename operations.

I've ruled out using the DirectOutputCommitter for a number of reasons.

  1. When used in conjunction with speculation mode it results in duplication (https://issues.apache.org/jira/browse/SPARK-9899)
  2. Task failures will leave clutter which would be impossible to find and remove/clean later.
  3. Spark 2.0 has removed support for this completely and no upgrade path exists.(https://issues.apache.org/jira/browse/SPARK-10063)

The only safe, performant, and consistent way to execute these jobs is to save them to unique temporary folder (unique by applicationId or timestamp) in hdfs first. And copy to S3 on job completion.

This allows concurrent jobs to execute as they will save to unique temp folders, no need to use the DirectOutputCommitter as the rename operation on HDFS is quicker than S3, and the saved data is more consistent.

vcetinick
  • 1,957
  • 1
  • 19
  • 41
  • 2
    thx for these valuable insights. This is a more complicated endeavor than seems should be. – WestCoastProjects Jun 27 '18 at 05:59
  • Just an addition to this answer. I did a similar thing. You can use **s3distcp** (https://aws.amazon.com/blogs/big-data/seven-tips-for-using-s3distcp-on-amazon-emr-to-move-data-efficiently-between-hdfs-and-amazon-s3/) to achieve that. But I did not yet manage to solve scenario when I use `saveAsTable`. Manual creation of table works though. – gorros Sep 18 '18 at 05:46
  • 2
    @vcetinick I am also facing this issue. Thanks for solutions.Do we have solution for this in Spark 2.4.4 and Hadoop 2.8.5 ? – Vikram Ranabhatt Jan 10 '20 at 16:30
  • This still stands on Spark 2.4.0.cludera2, HDFS (parquet, but that's irrelevant). It's not safe to append to the same directory from multiple application runs. Not even sequentially! If a run crashes, you can't know how it left things. Like if it couldn't delete /_temporary/0, the next commit from another application will also commit the rows left there. We use tables like /data/foo/batch=1234, instead of just /data/foo, and each run has its own batch. You can still read /data/foo as a single table (which has an extra "batch" column), just don't ever append to it on that level.
    – ddekany Feb 09 '20 at 17:48
4

Instead of using partitionBy

dataFrame.write().
         partitionBy("eventDate", "category")
            .mode(Append)
            .parquet("s3://bucket/save/path");

Alternatively you can write the files as

In job-1 specify the parquet file path as :

dataFrame.write().mode(Append)            
.parquet("s3://bucket/save/path/eventDate=20160101/channel=billing_events")

& in job-2 specify the parquet file path as :

dataFrame.write().mode(Append)            
.parquet("s3://bucket/save/path/eventDate=20160101/channel=click_events")
  1. Both jobs will create seperate _temporary directory under the respective folder so concurrency issue is solved.
  2. And partition discovery will also happen as eventDate=20160101 and for channel column.
  3. Disadvantage - even if channel=click_events do not exists in data still parquet file for the channel=click_events will be created.
Uwe Allner
  • 3,399
  • 9
  • 35
  • 49
parthiv
  • 41
  • 2
  • 2
    Feels this is not as ideal. You would need to iterate over each of the channels per day then ensure you filter out the irrelevant channel data then save into the correct partition folder. This is automatically handled by the partitionBy method. – vcetinick Feb 09 '17 at 22:41
2

I suspect this is because of the changes to partition discovery that were introduced in Spark 1.6. The changes means that Spark will only treat paths like .../xxx=yyy/ as partitions if you have specified a "basepath"-option (see Spark release notes here).

So I think your problem will be solved if you add the basepath-option, like this:

dataFrame
  .write()
  .partitionBy("eventDate", "category")
  .option("basepath", "s3://bucket/save/path")
  .mode(Append)
  .parquet("s3://bucket/save/path");

(I haven't had the chance to verify it, but hopefully it will do the trick :))

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • Not having any issues with the partitioning per-se, its more a concurrency issue. When jobs are executing some internal mechanism is creating temporary files at the basepath s3://bucket/save/path/_temporary which means 2 jobs cannot effectively be running on the same base path. Also im not sure that setting the base path is mandatory for partition discovery, only if you have pointed to a child path and not the base path in creating the dataframe – vcetinick Aug 16 '16 at 11:17
  • @vcetinick it seems we are experiencing similar issues using HDFS, I'll do some more investigation and potentially file a bug at the Spark issue tracker. – Base_v Sep 12 '16 at 11:07
-1

Multiple write tasks for same path with "partitionBy", will FAILED when _temporary been delete in cleanupJob of FileOutputCommitter , like No such file or directory .

TEST CODE:

def batchTask[A](TASK_tag: String, taskData: TraversableOnce[A], batchSize: Int, fTask: A => Unit, fTaskId: A => String): Unit = {
  var list = new scala.collection.mutable.ArrayBuffer[(String, java.util.concurrent.Future[Int])]()
  val executors = java.util.concurrent.Executors.newFixedThreadPool(batchSize)
  try {
    taskData.foreach(d => {
      val task = executors.submit(new java.util.concurrent.Callable[Int] {
        override def call(): Int = {
          fTask(d)
          1
        }
      })
      list += ((fTaskId(d), task))
    })
    var count = 0
    list.foreach(r => if (!r._2.isCancelled) count += r._2.get())
  } finally {
    executors.shutdown()
  }
}
def testWriteFail(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
  println(s"try save: ${outPath}")
  import org.apache.spark.sql.functions._
  import spark.sqlContext.implicits._
  batchTask[Int]("test", 1 to 20, 6, t => {
    val df1 =
      Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
        .toDF("int_column", "string_column", "date_column")
        .withColumn("t0", lit(t))
    df1.repartition(1).write
      .mode("overwrite")
      .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", false)
      .partitionBy("t0").csv(outPath)
  }, t => f"task.${t}%4d") // some Exception
  println(s"fail: count=${spark.read.csv(outPath).count()}")
}
try {
  testWriteFail(outPath + "/fail")
} catch {
  case e: Throwable =>
}

Failed

Use OutputCommitter :

package org.jar.spark.util
import java.io.IOException
/*
  * 用于 DataFrame 多任务写入同一个目录。
  * <pre>
  * 1. 基于临时目录写入
  * 2. 如果【任务的输出】可能会有重叠,不要使用 overwrite 方式,以免误删除
  * </pre>
  * <p/>
  * Created by liao on 2018-12-02.
  */
object JMultiWrite {
  val JAR_Write_Cache_Flag = "jar.write.cache.flag"
  val JAR_Write_Cache_TaskId = "jar.write.cache.taskId"
  /** 自动删除目标目录下同名子目录 */
  val JAR_Write_Cache_Overwrite = "jar.write.cache.overwrite"
  implicit class ImplicitWrite[T](dw: org.apache.spark.sql.DataFrameWriter[T]) {
    /**
      * 输出到文件,需要在外面配置 option format mode 等
      *
      * @param outDir    输出目标目录
      * @param taskId    此次任务ID,用于隔离各任务的输出,必须具有唯一性
      * @param cacheDir  缓存目录,最好是 '_' 开头的目录,如 "_jarTaskCache"
      * @param overwrite 是否删除已经存在的目录,默认 false 表示 Append模式
      *                  <font color=red>(如果 并行任务可能有相同 子目录输出时,会冲掉,此时不要使用 overwrite)</font>
      */
    def multiWrite(outDir: String, taskId: String, cacheDir: String = "_jarTaskCache", overwrite: Boolean = false): Boolean = {
      val p = path(outDir, cacheDir, taskId)
      dw.options(options(cacheDir, taskId))
        .option(JAR_Write_Cache_Overwrite, overwrite)
        .mode(org.apache.spark.sql.SaveMode.Overwrite)
        .save(p)
      true
    }
  }
  def options(cacheDir: String, taskId: String): Map[String, String] = {
    Map(JAR_Write_Cache_Flag -> cacheDir,
      JAR_Write_Cache_TaskId -> taskId,
      "mapreduce.fileoutputcommitter.marksuccessfuljobs" -> "false",
      "mapreduce.job.outputformat.class" -> classOf[JarOutputFormat].getName
    )
  }
  def path(outDir: String, cacheDir: String, taskId: String): String = {
    assert(outDir != "", "need OutDir")
    assert(cacheDir != "", "need CacheDir")
    assert(taskId != "", "needTaskId")
    outDir + "/" + cacheDir + "/" + taskId
  }
  /*-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-o-*/
  class JarOutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat {
    var committer: org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter = _

    override def getOutputCommitter(context: org.apache.hadoop.mapreduce.TaskAttemptContext): org.apache.hadoop.mapreduce.OutputCommitter = {
      if (this.committer == null) {
        val output = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath(context)
        this.committer = new JarOutputCommitter(output, context)
      }
      this.committer
    }
  }
  class JarOutputCommitter(output: org.apache.hadoop.fs.Path, context: org.apache.hadoop.mapreduce.TaskAttemptContext)
    extends org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(output, context) {
    override def commitJob(context: org.apache.hadoop.mapreduce.JobContext): Unit = {
      val finalOutput = this.output
      val cacheFlag = context.getConfiguration.get(JAR_Write_Cache_Flag, "")
      val myTaskId = context.getConfiguration.get(JAR_Write_Cache_TaskId, "")
      val overwrite = context.getConfiguration.getBoolean(JAR_Write_Cache_Overwrite, false)
      val hasCacheFlag = finalOutput.getName == myTaskId && finalOutput.getParent.getName == cacheFlag
      val finalReal = if (hasCacheFlag) finalOutput.getParent.getParent else finalOutput // 确定最终目录
      // 遍历输出目录
      val fs = finalOutput.getFileSystem(context.getConfiguration)
      val jobAttemptPath = getJobAttemptPath(context)
      val arr$ = fs.listStatus(jobAttemptPath, new org.apache.hadoop.fs.PathFilter {
        override def accept(path: org.apache.hadoop.fs.Path): Boolean = !"_temporary".equals(path.getName())
      })
      if (hasCacheFlag && overwrite) // 移除同名子目录
      {
        if (fs.isDirectory(finalReal)) arr$.foreach(stat =>
          if (fs.isDirectory(stat.getPath)) fs.listStatus(stat.getPath).foreach(stat2 => {
            val p1 = stat2.getPath
            val p2 = new org.apache.hadoop.fs.Path(finalReal, p1.getName)
            if (fs.isDirectory(p1) && fs.isDirectory(p2) && !fs.delete(p2, true)) throw new IOException("Failed to delete " + p2)
          })
        )
      }
      arr$.foreach(stat => {
        mergePaths(fs, stat, finalReal)
      })
      cleanupJob(context)
      if (hasCacheFlag) { // 移除缓存目录
        try {
          fs.delete(finalOutput, false)
          val pp = finalOutput.getParent
          if (fs.listStatus(pp).isEmpty)
            fs.delete(pp, false)
        } catch {
          case e: Exception =>
        }
      }
      // 不用输出 _SUCCESS 了
      //if (context.getConfiguration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
      //  val markerPath = new org.apache.hadoop.fs.Path(this.outputPath, "_SUCCESS")
      //  fs.create(markerPath).close()
      //}
    }
  }
  @throws[IOException]
  def mergePaths(fs: org.apache.hadoop.fs.FileSystem, from: org.apache.hadoop.fs.FileStatus, to: org.apache.hadoop.fs.Path): Unit = {
    if (from.isFile) {
      if (fs.exists(to) && !fs.delete(to, true)) throw new IOException("Failed to delete " + to)
      if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
    }
    else if (from.isDirectory) if (fs.exists(to)) {
      val toStat = fs.getFileStatus(to)
      if (!toStat.isDirectory) {
        if (!fs.delete(to, true)) throw new IOException("Failed to delete " + to)
        if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
      }
      else {
        val arr$ = fs.listStatus(from.getPath)
        for (subFrom <- arr$) {
          mergePaths(fs, subFrom, new org.apache.hadoop.fs.Path(to, subFrom.getPath.getName))
        }
      }
    }
    else if (!fs.rename(from.getPath, to)) throw new IOException("Failed to rename " + from + " to " + to)
  }
}

And then:

def testWriteOk(outPath: String)(implicit spark: SparkSession, sc: SparkContext): Unit = {
  println(s"try save: ${outPath}")
  import org.apache.spark.sql.functions._
  import org.jar.spark.util.JMultiWrite.ImplicitWrite // 导入工具
  import spark.sqlContext.implicits._
  batchTask[Int]("test.ok", 1 to 20, 6, t => {
    val taskId = t.toString
    val df1 =
      Seq((1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")))
        .toDF("int_column", "string_column", "date_column")
        .withColumn("t0", lit(taskId))
    df1.repartition(1).write
      .partitionBy("t0")
      .format("csv")
      .multiWrite(outPath, taskId, overwrite = true) // 这里使用了 overwrite ,如果分区有重叠,请不要使用 overwrite
  }, t => f"task.${t}%4d")
  println(s"ok: count=${spark.read.csv(outPath).count()}") // 40
}
try {
  testWriteOk(outPath + "/ok")
} catch {
  case e: Throwable =>
}

Success:

$  ls ok/
t0=1  t0=10 t0=11 t0=12 t0=13 t0=14 t0=15 t0=16 t0=17 t0=18 t0=19 t0=2  t0=20 t0=3  t0=4  t0=5  t0=6  t0=7  t0=8  t0=9

The same applies to other output formats, pay attention to the use of overwrite .

Test on spark 2.11.8.

Thanks for @Tal Joffe