2

I have the following class, the run returns a list of ints from a database table.

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
  }
}

The following code

val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  runJob.run(d) // d expected to be int
})
processed.saveAsTextFile("c:\\temp\\mpa")

get the error of

[error] ...\src\main\scala\main.scala:39: type mismatch;
[error]  found   : org.apache.spark.sql.Row
[error]  required: Int
[error]       runJob.run(d)
[error]                  ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

I tried

  1. val processed = itemListJob.run(rc, priority).select("id").as[Int].map(d =>
  2. case class itemListRow(id: Int); ....as[itemListRow].

Both of them got errors of

Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

Update: I'm trying to add the import implicits statements

  1. import sc.implicits._ got error of

    value implicits is not a member of org.apache.spark.SparkContext

  2. import sqlContext.implicits._ is OK. However, the later statement of processed.saveAsTextFile("c:\\temp\\mpa") got the error of

    value saveAsTextFile is not a member of org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)]

ca9163d9
  • 27,283
  • 64
  • 210
  • 413
  • What's the error when using `as[Int]`? – zsxwing May 23 '17 at 22:17
  • The error is `Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.` – ca9163d9 May 23 '17 at 22:19
  • 1
    Did you try `import spark.implicits._`? – zsxwing May 23 '17 at 22:23
  • It got the error of `not found: value spark` on `import spark.implicits._` – ca9163d9 May 23 '17 at 22:29
  • 1
    You need to create a spark session as described here: http://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sparksession – Steffen Schmitz May 23 '17 at 22:36
  • I saw `sqlContext` in your codes. So you can use `import sqlContext.implicits._`. – zsxwing May 23 '17 at 23:21
  • Or `import sqlContext.sparkSession.implicits._` – zsxwing May 23 '17 at 23:22
  • Thanks. I actually have one "SC". Just wondering if it's the best way to use implicit? Or any other approachea? – ca9163d9 May 23 '17 at 23:27
  • It must be either `SQLContext` or `SparkSession`. You can also get `SparkSession` from `Dataset.sparkSession` – zsxwing May 23 '17 at 23:30
  • @zsxwing I changed the code to use all data frame/dataset. However, it got a new error. I've posted a question for it https://stackoverflow.com/questions/44168394/spark-java-lang-unsupportedoperationexception-no-encoder-found-for-java-time-l?noredirect=1#comment75353157_44168394 – ca9163d9 May 24 '17 at 21:23

1 Answers1

3

You should simply change the line with select("id") to be as follows:

select("id").as[Int]

You should import the implicits for converting Rows to Ints.

import sqlContext.implicits._ // <-- import implicits that add the "magic"

You could also change run to include the conversion as follows (note the comments to the lines I added):

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ // <-- import implicits that add the "magic"
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") // <-- take only "id" (which Spark pushes down and hence makes your query faster
    .as[Int] // <-- convert Row into Int
  }
}

value saveAsTextFile is not a member of org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)]

The compilation error is because you try to use saveAsTextFile operation on Dataset that is not available.

Writing in Spark SQL is through DataFrameWriter that's available using write operator:

write: DataFrameWriter[T] Interface for saving the content of the non-streaming Dataset out into external storage.

So you should do the following:

processed.write.text("c:\\temp\\mpa")

Done!

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • I changed the `ItemList.run()` to return a DataSet as you suggested. However, I need to to add `import sqlContext.implicits._` before calling `val processed = itemListJob.run(...).map(....)`. – ca9163d9 May 24 '17 at 20:54