2

I'm trying to package my jar based off of code in a databricks notebook.

I have the following line that works in databricks but is throwing an error in the scala code:

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

val spark = SparkSession
                  .builder()
                  .appName("myApp")
                  .master("local")
                  .enableHiveSupport()
                  .getOrCreate()

val sc = SparkContext.getOrCreate()
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import spark.implicits._
import sqlContext.implicits._

...

var file_details = dbutils.fs.ls(folder_path2).toDF()

Which gives the error:

error: value toDF is not a member of Seq[com.databricks.backend.daemon.dbutils.FileInfo]

Does anyone know how to use dbutils.fs.ls().toDF() in a Scala .jar?


Edit: I found a similar question for pyspark that I'm trying to translate to Scala:

val dbutils = com.databricks.service.DBUtils

val ddlSchema = new ArrayType(
                    new StructType()
                        .add("path",StringType)
                        .add("name",StringType)
                        .add("size",IntegerType)
                , true)

var folder_path = "abfss://container@storage.dfs.core.windows.net"
var file_details = dbutils.fs.ls(folder_path)

var df = spark.createDataFrame(sc.parallelize(file_details),ddlSchema)

but I'm getting this error:

error: overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[com.databricks.service.FileInfo], org.apache.spark.sql.types.ArrayType)
       var df = spark.createDataFrame(sc.parallelize(file_details),ddlSchema)
Alex Ott
  • 80,552
  • 8
  • 87
  • 132
steven hurwitt
  • 183
  • 2
  • 15
  • It looks like you are missing some "import" that brings extension method "toDF" into your program scope. I guess that import is implicitly in scope in Databricks (notebook you mean?), but not in your standalone program. I would look at documentation of this class `DBUtilsHolder` – Alexey Novakov Oct 26 '21 at 18:13
  • thru other questions, i thought `import spark.implicits._` was the import that brought in .toDF() – steven hurwitt Oct 26 '21 at 20:50
  • @AlexeyNovakov is there a way to see the attributes of a given class in scala? in python i could do something like `dir(dbutils.fs.ls)`. **Edit:** I think `dbutils.getClass.getMethods` is what I want in scala – steven hurwitt Oct 26 '21 at 21:01
  • `file_details.toDF` works just fine for me in the notebooks at least. Are you trying to use it with databricks-connect? then dbutils may not work, and it's better to use Hadoop interface: https://docs.databricks.com/dev-tools/databricks-connect.html#access-the-hadoop-filesystem – Alex Ott Oct 27 '21 at 05:30
  • I'm trying to use it in a .jar file in my IDE is the problem. so i'm using the dbutils api in my pom file, which isn't really meant for use outside of notebooks :/ – steven hurwitt Oct 27 '21 at 13:12
  • @stevenhurwitt I think you need to convert FileInfo to Row type. See example here: https://sparkbyexamples.com/spark/different-ways-to-create-a-spark-dataframe/#from-rdd – Alexey Novakov Oct 27 '21 at 13:50

2 Answers2

1

Ok I got it!!! Here is the code I used:

var file_details = dbutils.fs.ls(folder_path)
var fileData = file_details.map(x => (x.path, x.name, x.size.toString))
var rdd = sc.parallelize(fileData)
val rowRDD = rdd.map(attributes => Row(attributes._1, attributes._2, attributes._3.toInt))

val schema = StructType( Array(
                 StructField("path", StringType,true),
                 StructField("name", StringType,true),
                 StructField("size", IntegerType,true)
             ))

var fileDf = spark.createDataFrame(rowRDD, schema)
steven hurwitt
  • 183
  • 2
  • 15
0

In order to trigger the implicit conversion to a Dataset like container and then have toDF() available you also need an implicit spark Encoder (besides the already present spark.implicits._ )

I think this auto-derivation will work and will make toDF() available:

  val implicit encoder = org.apache.spark.sql.Encoders.product[com.databricks.backend.daemon.dbutils.FileInfo]

Otherwise yeah you can work directly with RDDs.

gatear
  • 946
  • 2
  • 10