When I stream data from a Delta table hosted on Azure Datalake Storage (ADLS) Gen2, the stream works for a little bit before failing with the error below. The error says that the path doesn't exist, but I can see in the storage logs that files are successfully being written and read from that path before and after the error. It seems safe to say that the path does exist in Azure Storage, despite the exception.
For context:
- I am using Spark 3.1 (pySpark)
- I have a separate stream actively writing data to the delta table via a
ForeachBatch
sink. - The delta table is a managed table.
- This happens when the input and output streams are running on the same cluster and separate clusters.
- I am using Azure Synapse.
Fixes I've tried:
- Increasing the batch execution interval from
None
to10 seconds
. After this, the query went from failing after ~15 minutes with the error below to failing after a little over an hour. - Switching to a premium tier ADLS account (no effect).
I found one other person with this error, but no solution was provided: https://github.com/delta-io/delta/issues/932 since it was asked to the wrong audience. It seems that a simple repro can be made by reading and writing Spark streams to a delta table hosted on ADLS gen2, based on their Issue.
How can I pin down the root cause? Are there any Spark or ADLS settings I can change to mitigate this?
22/03/19 02:06:20 ERROR MicroBatchExecution: Query [id = 00f1d866-74a2-42f9-8fb6-c8d1a76e00a6, runId = 902f8480-4dc6-4a7d-aada-bfe3b660d288] terminated with error
java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:408)
at org.apache.spark.sql.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.scala:69)
at org.apache.spark.sql.delta.DeltaLog.getChanges(DeltaLog.scala:227)
at org.apache.spark.sql.delta.sources.DeltaSource.filterAndIndexDeltaLogs$1(DeltaSource.scala:190)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChanges(DeltaSource.scala:203)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame(DeltaSource.scala:117)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame$(DeltaSource.scala:112)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChangesAndCreateDataFrame(DeltaSource.scala:144)
at org.apache.spark.sql.delta.sources.DeltaSource.getBatch(DeltaSource.scala:385)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:486)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:207)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:231)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:905)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:876)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:858)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:405)
... 37 more