We have a spark structured streaming job written in scala running in production which reads from a kafka topic and writes to HDFS sink. Triggertime is 60 Seconds. The job has been deployed 4months back and after running well for a month, we started getting the below error and job fails instantly:
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/XYZ/hive/prod/landing_dir/abc/_spark_metadata/.edfac8fb-aa6d-4c0e-aa19-48f44cc29774.tmp (inode 6736258272) Holder DFSClient_NONMAPREDUCE_841796220_49 does not have any open files
Earlier this issue was not regular i.e. it was happening once in 2-3 weeks. Last 1 month, these error has become very frequent and happening at an interval of 3-4 days and failing the job. We restart this job once in a week as part of regular maintenance. Spark version is 2.3.2 and we run on YARN cluster manager. From the error it is evident that something is not going right within Write Ahead Log(WAL) directory since the path is pointing to _spark_metadata. Would like to understand what causing this exception and how we can handle it. Is this something we can handle in our application or is it an environment issue need to be addressed at the infra level. Below is the code snippet:
val spark = SparkSession
.builder
.master(StreamerConfig.sparkMaster)
.appName(StreamerConfig.sparkAppName)
.getOrCreate()
spark.conf.set("spark.sql.orc.impl", "native")
spark.conf.set("spark.streaming.stopGracefullyOnShutdown","true")
spark.conf.set("spark.sql.files.ignoreCorruptFiles","true")
spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("spark.shuffle.service.enabled","true")
val readData = spark
.readStream
.format("kafka") .option("kafka.bootstrap.servers",StreamerConfig.kafkaBootstrapServer)
.option("subscribe",StreamerConfig.topicName)
.option("failOnDataLoss", false)
.option("startingOffsets",StreamerConfig.kafkaStartingOffset) .option("maxOffsetsPerTrigger",StreamerConfig.maxOffsetsPerTrigger)
.load()
val deserializedRecords = StreamerUtils.deserializeAndMapData(readData,spark)
val streamingQuery = deserializedRecords.writeStream
.queryName(s"Persist data to hive table for ${StreamerConfig.topicName}")
.outputMode("append")
.format("orc")
.option("path",StreamerConfig.hdfsLandingPath)
.option("checkpointLocation",StreamerConfig.checkpointLocation)
.partitionBy("date","hour")
.option("truncate","false")
.trigger(Trigger.ProcessingTime(StreamerConfig.triggerTime))
.start()