Using Azure Databricks Runtime 9.1, I want to start a SparkListener and access dbutils features inside of the SparkListener.
This listener should log some information on the start of the Spark application. It should list out the file system (as a simple example) using dbutils.fs.ls
.
The question How to properly access dbutils in Scala when using Databricks Connect is super close to what I'm looking to do but they are focused on dbconnect whereas I want dbutils on a SparkListener. It does point to the dbutils api library on MS Docs page where it seems to indicate that I need only specify the correct target and version of the dbutils-api package.
In the sample listener below...
- If I do not include the
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
the jar fails to compile since I referencedbutils
in theonApplicationStart
method. - When I do include the import, it successfully compiles.
- However, it fails to initialize the SparkListener.
- I receive a NullPointerException after it tries to execute the
dbutils.fs.ls
command.
Any thoughts and/or guidance would be greatly appreciated!
Sample Listener Using dbutils on Application Start
package my.custom.listener
import java.util.logging.Logger
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart}
import org.slf4j.{Logger, LoggerFactory}
// Crucial Import
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
class LibraryListener extends SparkListener {
private var isDatabricks = false
val log = LoggerFactory.getLogger(classOf[LibraryListener])
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
log.info("HELLO WORLD!")
log.info(s"App Name ${applicationStart.appName}")
log.info(s"User ${applicationStart.sparkUser}")
isDatabricks = !(sys.env.get("DATABRICKS_RUNTIME_VERSION").isEmpty)
if (isDatabricks){
log.info("WE ARE USING DATABRICKS!")
// Dummy example of using dbutils
log.info(dbutils.fs.ls("dbfs:/"))
}
}
}
Error Message From Spark Listener Initialization
org.apache.spark.SparkException: Exception when registering SparkListener
at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2829)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:701)
at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
at com.databricks.backend.daemon.driver.DriverCorral.driverContext(DriverCorral.scala:229)
at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:102)
at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:50)
at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:287)
at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:362)
at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:117)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:425)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:395)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:484)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:504)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:85)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:85)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:479)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:404)
at com.databricks.DatabricksMain.recordOperationWithResultTags(DatabricksMain.scala:85)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:395)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:367)
at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:85)
at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:425)
at com.databricks.DatabricksMain.main(DatabricksMain.scala:116)
at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.NullPointerException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.databricks.dbutils_v1.DBUtilsHolder$$anon$1.invoke(DBUtilsHolder.scala:17)
at com.sun.proxy.$Proxy35.fs(Unknown Source)
at my.custom.listener.LibraryListener.<init>(LibraryListener.scala:19)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3077)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3066)
at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1(SparkContext.scala:2810)
at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1$adapted(SparkContext.scala:2809)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2809)
... 33 more
build.gradle
plugins {
id 'scala'
id 'java-library'
}
repositories {
mavenCentral()
}
dependencies {
// Use Scala 2.13 in our library project
implementation 'org.scala-lang:scala-library:2.12.15'
// Crucial Implementation
// https://mvnrepository.com/artifact/com.databricks/dbutils-api
implementation group: 'com.databricks', name: 'dbutils-api_2.12', version: '0.0.5'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.32'
implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '3.0.0'
implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '3.0.0'
implementation 'com.google.guava:guava:30.1.1-jre'
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.scalatest:scalatest_2.12:3.2.9'
testImplementation 'org.scalatestplus:junit-4-13_2.12:3.2.2.0'
testImplementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.32'
testRuntimeOnly 'org.scala-lang.modules:scala-xml_2.12:1.2.0'
api 'org.apache.commons:commons-math3:3.6.1'
}
Thank you for any insights!