8

I am running spark structured streaming on a test env. It happens from time to time that the job fail duo to some checkpoint file is not found.

One reason might be that the kafka topic has a very short retention time. But I've added .option("failOnDataLoss", "false") to SparkSession.

I have some basic (very basic) understanding of spark checkpoint. I suppose it should be recovered if I deleted the checkpoint dir. But as I tested, once this error happens, delete the dir doesn't help. I need to use a different checkpoint dir to fix it.

Any reason why delete the checkpoint dir doesn't work? Or is there a way/option can help to avoid this error?

     diagnostics: User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 14, master-testspark.runspark.com, executor 2): java.lang.IllegalStateException: Error reading delta file consumer-cp3/state/0/4/1.delta of HDFSStateStoreProvider[id = (op=0, part=4), dir = consumer-cp3/state/0/4]: consumer-cp3/state/0/4/1.delta does not exist
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:265)
    at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:200)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

    at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1225)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1538)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:332)
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:327)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:340)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:786)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:407)
    ... 27 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
    at org.apache.hadoop.ipc.Client.call(Client.java:1498)
    at org.apache.hadoop.ipc.Client.call(Client.java:1398)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
    at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:272)
    at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
    at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1238)
    ... 39 more
DeepNightTwo
  • 4,809
  • 8
  • 46
  • 60
  • 1
    Did you managed to find out the cause of the problem? I have been facing the exact same issue... – Ander Nov 26 '18 at 19:22
  • Would second that comment above – Kaspar Dec 12 '18 at 15:04
  • am also facing same issue..any solution for this ? – BigD Jan 25 '19 at 11:34
  • same issue here. looks like Spark is trying to find the recovery intermediate state from the previous run, but there is just no data under that state folder. – linehrr May 30 '19 at 22:04
  • I think you'll find this relevant https://stackoverflow.com/questions/42006664/apache-spark-structured-streaming-s3-checkpoint-support – EnvyChan Nov 19 '19 at 13:14

2 Answers2

0
  • exetutor memory not sufficent
  • batch size too big
  • timestamp field not right
ico001
  • 1
  • 1
  • This does not provide an answer to the question. Once you have sufficient [reputation](https://stackoverflow.com/help/whats-reputation) you will be able to [comment on any post](https://stackoverflow.com/help/privileges/comment); instead, [provide answers that don't require clarification from the asker](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can-i-do-instead). - [From Review](/review/late-answers/33998213) – ugexe Mar 16 '23 at 00:32
  • `exetutor` doesn't look likely – greybeard Mar 20 '23 at 15:37
-4

Checkpointing is a process of truncating RDD lineage graph and saving it to a reliable distributed (HDFS) or local file system.

from Apache Spark Documentation Two types of checkpointing.

  • Metadata checkpointing

    Saving of the information defining the streaming computation to fault-tolerant storage like HDFS. This is used to recover from failure of the node running the driver of the streaming application .

  • Data checkpointing

    This is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically checkpointed to reliable storage (e.g. HDFS) to cut off the dependency chains.

if you want to check how it works internally please check this Spark blog by Tathagata Fault-tolerance using checkpointing

Spark uses the Checkpoint to get recovered from failure . Not to recreate the directory you have deleted . That's why you are getting this exception .

java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
Community
  • 1
  • 1
Indrajit Swain
  • 1,505
  • 1
  • 15
  • 22
  • My application is Spark Structured Streaming. Maybe it is not the same as Spark Streaming about checkpoint handling. Based on the post, the checkpoint files will not be read unless there is a failure and spark tries to recover. But I didn't notices a failure. The only failure is the FileNotFound exception. So under what condition spark will try to read checkpoint files without any previous failures? – DeepNightTwo Feb 02 '18 at 13:41
  • @DeepNightTwo Did you find any solution to this issue? – Kaspar Dec 12 '18 at 14:17
  • mine is also structured streaming, same issue here. – linehrr May 30 '19 at 22:05