3

I'm trying to set up the stream to begin processing incoming files. Looks like Databricks is unable to save a checkpoint. I tried location in ADLS Gen2 and DBFS with the same result. Databricks creates needed folder with some scructure but cannot write to it. Are there any special requirements for a checkpoint location?

Checkpoint folder

Databricks Community Edition, runtime version: 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)

spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.partitionColumns", "year, month, day")
.option("header", "true")
.schema(schema)
.load(destFolderName)
.writeStream.format("delta")
.option("checkpointLocation", checkpointPath)
.outputMode("append")
.partitionBy("year", "month", "day")
.start(outputPath)

The error:

java.lang.UnsupportedOperationException: com.databricks.backend.daemon.data.client.DBFSV1.createAtomicIfAbsent(path: Path)
at com.databricks.tahoe.store.EnhancedDatabricksFileSystemV1.createAtomicIfAbsent(EnhancedFileSystem.scala:324)
at com.databricks.spark.sql.streaming.AWSCheckpointFileManager.createAtomicIfAbsent(DatabricksCheckpointFileManager.scala:159)
at com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager.createAtomicIfAbsent(DatabricksCheckpointFileManager.scala:60)
at com.databricks.sql.streaming.state.RocksDBFileManager.zipToDbfsFile(RocksDBFileManager.scala:497)
at com.databricks.sql.streaming.state.RocksDBFileManager.saveCheckpointToDbfs(RocksDBFileManager.scala:181)
at com.databricks.sql.rocksdb.CloudRocksDB.$anonfun$open$5(CloudRocksDB.scala:451)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:627)
at com.databricks.sql.rocksdb.CloudRocksDB.timeTakenMs(CloudRocksDB.scala:527)
at com.databricks.sql.rocksdb.CloudRocksDB.$anonfun$open$2(CloudRocksDB.scala:439)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:395)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:484)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:504)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:20)

2 Answers2

2

Auto Loader feature, which I was trying to use, is not currently available on Databricks Community Edition

https://databricks.com/notebooks/Databricks-Data-Integration-Demo.html

so "cloudFiles" cannot be used with Community Edition

1

You can try to disable multi cluster writes:

spark.databricks.delta.multiClusterWrites.enabled false

Check your path - please try to write to standard dbfs managed by databricks ( for example to dbfs:/local_disk0/tmp/checkpointName ).

If you use own mount please check azure permission there (Blob Storage Contributor is necessary).

Please diagnose also read stream

df = spark.readStream(...)
display(df)
Hubert Dudek
  • 1,666
  • 1
  • 13
  • 21
  • Thanks for your response. I tried disabling multi cluster writes. I can successfully read and write parquet files to DBFS and ADLS Gen2, but not in a stream mode. Blob Storage Contributor is assigned. I commented out writing part and now reading is failing I guess at the "load()" line. Can this problem be related to the fact that Databricks Community Edition runs on DBFSV1? https://kb.databricks.com/delta/delta-write-fails.html – Vik Muzychko Dec 14 '21 at 10:30
  • 1
    can be but I wasn't aware that there is DBFSV1. Generally community edition is limited and I only do there simple staff for example testing sql queries. I haven't done there streaming. – Hubert Dudek Dec 14 '21 at 10:50