4

Using Azure Databricks Runtime 9.1, I want to start a SparkListener and access dbutils features inside of the SparkListener.

This listener should log some information on the start of the Spark application. It should list out the file system (as a simple example) using dbutils.fs.ls.

The question How to properly access dbutils in Scala when using Databricks Connect is super close to what I'm looking to do but they are focused on dbconnect whereas I want dbutils on a SparkListener. It does point to the dbutils api library on MS Docs page where it seems to indicate that I need only specify the correct target and version of the dbutils-api package.

In the sample listener below...

  • If I do not include the import com.databricks.dbutils_v1.DBUtilsHolder.dbutils the jar fails to compile since I reference dbutils in the onApplicationStart method.
  • When I do include the import, it successfully compiles.
    • However, it fails to initialize the SparkListener.
    • I receive a NullPointerException after it tries to execute the dbutils.fs.ls command.

Any thoughts and/or guidance would be greatly appreciated!

Sample Listener Using dbutils on Application Start

package my.custom.listener

import java.util.logging.Logger
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart}
import org.slf4j.{Logger, LoggerFactory}

// Crucial Import
import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

class LibraryListener extends SparkListener {
  private var isDatabricks = false
  val log = LoggerFactory.getLogger(classOf[LibraryListener])

 override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    log.info("HELLO WORLD!")
    log.info(s"App Name ${applicationStart.appName}")
    log.info(s"User  ${applicationStart.sparkUser}")

    isDatabricks = !(sys.env.get("DATABRICKS_RUNTIME_VERSION").isEmpty)

    if (isDatabricks){
      log.info("WE ARE USING DATABRICKS!")
      // Dummy example of using dbutils
      log.info(dbutils.fs.ls("dbfs:/"))
    }
  }
}

Error Message From Spark Listener Initialization

org.apache.spark.SparkException: Exception when registering SparkListener
    at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2829)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:701)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
    at com.databricks.backend.daemon.driver.DriverCorral.driverContext(DriverCorral.scala:229)
    at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:102)
    at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:50)
    at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:287)
    at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:362)
    at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:117)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:425)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:395)
    at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:484)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:504)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:85)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:85)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:479)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:404)
    at com.databricks.DatabricksMain.recordOperationWithResultTags(DatabricksMain.scala:85)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:395)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:367)
    at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:85)
    at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:425)
    at com.databricks.DatabricksMain.main(DatabricksMain.scala:116)
    at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.NullPointerException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.databricks.dbutils_v1.DBUtilsHolder$$anon$1.invoke(DBUtilsHolder.scala:17)
    at com.sun.proxy.$Proxy35.fs(Unknown Source)
    at my.custom.listener.LibraryListener.<init>(LibraryListener.scala:19)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3077)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3066)
    at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1(SparkContext.scala:2810)
    at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1$adapted(SparkContext.scala:2809)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:2809)
    ... 33 more

build.gradle

plugins {
    id 'scala'
    id 'java-library'
}

repositories {
    mavenCentral()
}

dependencies {
    // Use Scala 2.13 in our library project
    implementation 'org.scala-lang:scala-library:2.12.15'

    // Crucial Implementation
    // https://mvnrepository.com/artifact/com.databricks/dbutils-api
    implementation group: 'com.databricks', name: 'dbutils-api_2.12', version: '0.0.5'

    implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.32'
    implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '3.0.0'
    implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '3.0.0'
    implementation 'com.google.guava:guava:30.1.1-jre'

    testImplementation 'junit:junit:4.13.2'
    testImplementation 'org.scalatest:scalatest_2.12:3.2.9'
    testImplementation 'org.scalatestplus:junit-4-13_2.12:3.2.2.0'
    testImplementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.32'
    testRuntimeOnly 'org.scala-lang.modules:scala-xml_2.12:1.2.0'
    api 'org.apache.commons:commons-math3:3.6.1'
}

Thank you for any insights!

Will J
  • 347
  • 1
  • 10
  • Hi, have you tried checking this? https://github.com/mspnp/spark-monitoring – dadadima Nov 14 '21 at 16:01
  • @dadadima - Thank you for commenting! Unfortunately, the spark-monitoring listener does not use dbutils. It is a great example of a well written spark listener but doesn't have a good example to learn how dbutils can be accessed in the listener. – Will J Nov 15 '21 at 04:25
  • Specifically - what functionality of dbutils do you need? dbutils may need fully initialized SparkSession to work, and `ApplicationStart` could be too early. – Alex Ott Nov 20 '21 at 12:01
  • Thanks for replying @AlexOtt ! I need dbutils.fs.mounts() so perhaps I just need to try somewhere else like on JobEnd! I will give that a shot! – Will J Nov 25 '21 at 03:46

0 Answers0