Using a Spark Listener on Databricks, I am trying to see if a given class is installed but given Databricks' way of installing packages, the Listener cannot see packages installed after the cluster has been started.
In a Java Spark Listener, is there a better way to recognize that a class is installed for packages installed via Databricks' Libraries API / UI?
Summary
- Using a SparkListener installed via a cluster-scoped init script on Databricks.
- Using ClassLoader in the Listener to check if a given class is installed.
- On Apache Spark
- Works on Apache Spark if the Listener is installed via
--packages
or--jars
. - Fails on Apache Spark if the Listener is installed via
--conf spark.driver.extraClassPath
and the desired libraries were installed via--packages
or--jars
.
- Works on Apache Spark if the Listener is installed via
- On (Azure) Databricks
- Works on Databricks if the library already exists in the
/datbaricks/jars
directory which is the$CLASSPATH
directory. - Fails on Databricks if the library is installed via the Libraries API / UI (jars installed this way seem to end up at
/local_disk0/tmp
).
- Works on Databricks if the library already exists in the
Spark Listener Details
With Apache Spark, I can install a Spark Listener via --packages
+ --conf spark.extraListeners=listener.MyListener
and leverage a ClassLoader in the Spark Listener to check for any class installed through --jars
, --packages
, or on the class path. The listener to detect if a class exists looks like this.
public class MyListener extends org.apache.spark.scheduler.SparkListener {
private static final Logger log = LoggerFactory.getLogger("MyLogger");
@Override
public void onJobStart(SparkListenerJobStart jobStart) {
try{
log.info("Trying LogicalRelation");
MyListener.class.getClassLoader().loadClass(
"org.apache.spark.sql.execution.datasources.LogicalRelation"
);
log.info("Got logical relation");
}
catch (ClassNotFoundException e){
log.info("Couldn't find LogicalRelation");
}
try{
log.info("Trying org.apache.iceberg.catalog.Catalog");
MyListener.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog");
log.info("Got org.apache.iceberg.catalog.Catalog!!!!");
} catch(ClassNotFoundException e){
log.info("Could not get org.apache.iceberg.catalog.Catalog");
}
try{
log.info("Trying Kusto DefaultSource");
MyListener.class.getClassLoader().loadClass("com.microsoft.kusto.spark.datasource.DefaultSource");
log.info("Got Kusto DefaultSource!!!!");
} catch(ClassNotFoundException e){
log.info("Could not get Kusto DefaultSource");
}
}
}
On Databricks, the listener is installed via an init script that looks like:
cp -f /dbfs/databricks/custom/listener.jar /mnt/driver-daemon/jars || { echo "Error"; exit 1;}
cat << 'EOF' > /databricks/driver/conf/customer-listener.conf
[driver] {
"spark.extraListeners" = "listener.MyListener"
}
EOF
This installation approach is similar to other public listeners:
Attempted to use URLClassLoader
It seems that the Scala ClassLoader doesn't play nicely with a Java classloader. I attempted to add a URLClassLoader as per another SO post on setting a different classloader but the ClassNotFoundException continues.
This code on a Databricks Interactive notebook, however, does successfully find my test classes
URLClassLoader ucl;
try {
log.info("URL Class Loader Attempt V3");
File file = new File("/local_disk0/tmp/");
URL classUrl = file.toURI().toURL();
URL[] urls = new URL[] { classUrl };
System.out.println(urls.toString());
ucl = new URLClassLoader(urls, getClass().getClassLoader());
ucl.loadClass("com.microsoft.kusto.spark.datasource.DefaultSource");
try {
ucl.close();
} catch (IOException e) {
log.error("Failed to close url classloader");
}
log.info("GOT KustoLIBRARY with URL Class Loader!");
} catch (ClassNotFoundException e) {
// Still hitting this one
log.info("Could not get Kusto Library with URLClassLoader");
} catch (MalformedURLException e) {
log.info("The URL was malformed");
}
Databricks Library Details
With Databricks, the majority of users use the Libraries feature which installs jars AFTER spark has started and allows for users to easily install a jar via the Databricks UI or through an API.
When using the above listener and the ClassLoader
it will consistently raise a ClassNotFoundException
for packages installed via the Libraries API.
In the Databricks logs, I can see the desired jar being installed in the logs.
22/07/14 13:32:34 INFO DriverCorral: [Thread 123] AttachLibraries - candidate libraries: List(JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:34 INFO DriverCorral: [Thread 123] AttachLibraries - new libraries to install (including resolved dependencies): List(JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE))
22/07/14 13:32:37 INFO SharedDriverContext: [Thread 123] attachLibrariesToSpark JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:37 INFO LibraryDownloadManager: Downloading a library that was not in the cache: JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE)
22/07/14 13:32:37 INFO LibraryDownloadManager: Attempt 1: wait until library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) is downloaded
22/07/14 13:32:37 INFO LibraryDownloadManager: Downloaded library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) as local file /local_disk0/tmp/addedFile2043314239110388521kusto_spark_3_0_2_12_3_0_0-6add9.jar in 39 milliseconds
22/07/14 13:32:37 INFO SharedDriverContext: Successfully saved library JavaJarId(dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar,,NONE) to local file /local_disk0/tmp/addedFile2043314239110388521kusto_spark_3_0_2_12_3_0_0-6add9.jar
22/07/14 13:32:37 INFO SharedDriverContext: Successfully attached library dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar to Spark
22/07/14 13:32:37 INFO LibraryState: [Thread 123] Successfully attached library dbfs:/FileStore/jars/maven/com/microsoft/azure/kusto/kusto-spark_3.0_2.12-3.0.0.jar
If I were to install the desired jar/package and all of it's dependencies into the /databricks/jars
folder, the Spark Listener can successfully detect that the packages are installed. Confirmed by Databricks Employee on SO. However, this is not a common practice given the Databricks Libraries feature.
So, it all seems to boil down to: How do I get the main ClassLoader on a Databricks interactive or job clusters to recognize libraries installed via the Spark Application context (as seen in the Libraries API / UI)?
Thank you for any insights!