18

For spark sql, how should we fetch data from one folder in HDFS, do some modifications, and save the updated data to the same folder in HDFS via Overwrite save mode without getting FileNotFoundException?

import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf

val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")

newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works

FileNotFoundException happens when we read data from the hdfs dir "d=2017-03-20", and save (SaveMode.Overwrite) updated data to the same hdfs dir "d=2017-03-20"

Caused by: org.apache.spark.SparkException: Task failed while writing rows
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
  ... 8 more

The following tries still get the same error, how should I solve this problem by using spark sql? Thank you!

val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"

val df = sparkSession.read.parquet(hdfsDirPath)

val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)

// or

val df = sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)

// or

val df = sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")
    
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
Czaporka
  • 2,190
  • 3
  • 10
  • 23
faustineinsun
  • 451
  • 1
  • 6
  • 16
  • 1
    i dont think it can be done you need to save it to other directory and then copy to original directory – Akash Sethi Mar 21 '17 at 07:44
  • 2
    We cannot write a Spark Dataframe to the file from where we are reading it. If you want to still do it then first write the DF in a temporary directory and then write it the directory with `SaveMode.Overwrite`. – himanshuIIITian Mar 21 '17 at 09:32
  • Thank you Akash and himanshuIIITian, writing the DF in a temp dir on HDFS is a good solution, but I was wondering if there's a way we can use Spark SQL to solve this problem? Since writing and fetching data from HDFS is less time- and space-efficient than just using Spark SQL in Memory. Can we use REFRESH, TRUNCATE, or DROP table to solve the problem? – faustineinsun Mar 21 '17 at 20:14
  • I have the same issue, even though I am writing the df to temp table in java – dileepVikram May 09 '18 at 12:18
  • @faustineinsun How did you solve this? Did you find any other way other than temp directory? – ss301 Feb 11 '20 at 18:03

4 Answers4

13

I solved this , first I write my Dataframe to a temp directory , and delete the source I reading , and rename the temp directory to source name . QAQ

廖梓帆
  • 171
  • 3
  • Thank you! I was wondering if we can use REFRESH, TRUNCATE, or DROP table to solve the problem? Do operations in Memory is more efficient than do the same things on Disk – faustineinsun Mar 21 '17 at 20:18
  • This is the only answer that works. The other answer is wrong. I upvoted the other answer myself at first, but couldn't retract my upvote, it says it's locked in until I edit the question. – pavel_orekhov Jan 16 '19 at 17:26
9

Why don't you just cache it after reading it. Saving it to another file directory and then moving the directory might entail some extra permissions. I also have been forcing an action as well, like a show().

val myDF = spark.read.format("csv")
    .option("header", "false")
    .option("delimiter", ",")
    .load("/directory/tofile/")


myDF.cache()
myDF.show(2)
uh_big_mike_boi
  • 3,350
  • 4
  • 33
  • 64
  • 2
    `cache()` is an alias for `persist(StorageLevel.MEMORY_ONLY)`, which may not be good for datasets which are larger than cluster memory. `persist(StorageLevel.MEMORY_AND_DISK_ONLY)` would be a better solution, but it saves data to local if memory is not enough. I used `persist` before, but seems it didn't work – faustineinsun Oct 26 '17 at 21:49
  • Did you try calling an action after the cache? Is the "overwrite" still your first action? – uh_big_mike_boi Oct 27 '17 at 15:45
  • 1
    THis didn't work for me. Liao's answer (save to temp dir) did work – Jake Jul 29 '18 at 21:19
  • you need to cache and then use an action afterwards (like show) to force spark to cache the data before doing anything else. In that case it should work. Tested with Spark 2.4.x – sdikby Oct 23 '19 at 14:57
  • @sdikby I tried the same, with cache and show, but its throwing error if I run a count on the dataset. – anidev711 Nov 27 '19 at 16:37
3

I faced similar issue. I was writing dataframe to a hive table using below code

dataframe.write.mode("overwrite").saveAsTable("mydatabase.tablename")   

When I tried to query this table, I was getting the same error. I then added the below line of code after creating table to refresh the table, which solved the issue.

spark.catalog.refreshTable("mydatabase.tablename")
Sarath Subramanian
  • 20,027
  • 11
  • 82
  • 86
  • `spark.sql("refresh mydatabase.tablename") ` did not work for me, but your solution solved the issue. Thank you – Joha Nov 09 '20 at 09:40
1
val dfOut = df.filter(r => r.getAs[Long]("dsctimestamp") > (System.currentTimeMillis() - 1800000))

In the above line of code, df had an underlying Hadoop partition. Once I had made this transformation (i.e., to dfOut), I could not find a way to delete, rename, or overwrite the underlying partition until dfOut had been garbage collected.

My solution was to keep the old partition, create a new partition for dfOut, flag the new partition as current and then delete the old partition some given time later, after dfOut had been garbage collected.

May not be an ideal solution. I would love to learn a less tortuous way of addressing this issue. But it works.

Jake
  • 4,322
  • 6
  • 39
  • 83