5

I'm struggling to write a basic unit test for creation of a data frame, using the example text file provided with Spark, as follows.

class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {

private val master = "local[*]"
private val appName = "data_load_testing"

private var spark: SparkSession = _

override def beforeEach() {
  spark = new SparkSession.Builder().appName(appName).getOrCreate()
}

import spark.implicits._

 case class Person(name: String, age: Int)

  val df = spark.sparkContext
      .textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0),attributes(1).trim.toInt))
      .toDF()

  test("Creating dataframe should produce data from of correct size") {
  assert(df.count() == 3)
  assert(df.take(1).equals(Array("Michael",29)))
}

override def afterEach(): Unit = {
  spark.stop()
}

}

I know that the code itself works (from spark.implicits._ .... toDF()) because I have verified this in the Spark-Scala shell, but inside the test class I'm getting lots of errors; the IDE does not recognise 'import spark.implicits._, or toDF(), and therefore the tests don't run.

I am using SparkSession which automatically creates SparkConf, SparkContext and SQLContext under the hood.

My code simply uses the example code from the Spark repo.

Any ideas why this is not working? Thanks!

NB. I have already looked at the Spark unit test questions on StackOverflow, like this one: How to write unit tests in Spark 2.0+? I have used this to write the test but I'm still getting the errors.

I'm using Scala 2.11.8, and Spark 2.2.0 with SBT and IntelliJ. These dependencies are correctly included within the SBT build file. The errors on running the tests are:

Error:(29, 10) value toDF is not a member of org.apache.spark.rdd.RDD[dataLoadTest.this.Person] possible cause: maybe a semicolon is missing before `value toDF'? .toDF()

Error:(20, 20) stable identifier required, but dataLoadTest.this.spark.implicits found. import spark.implicits._

IntelliJ won't recognise import spark.implicits._ or the .toDF() method.

I have imported: import org.apache.spark.sql.SparkSession import org.scalatest.{BeforeAndAfterEach, FlatSpec, FunSuite, Matchers}

Marcin
  • 48,559
  • 18
  • 128
  • 201
LucieCBurgess
  • 759
  • 5
  • 12
  • 26
  • What are you actually trying to test here? This test basically just tests that your data have 3 rows. – Marcin Aug 08 '17 at 16:15
  • However, the actual answer to your question is that the simplest way to is to add spark as dependencies and have your build tool run tests. – Marcin Aug 08 '17 at 16:16
  • @Marcin I could use SBT to run the tests from the command line but actually I prefer the comfort of my IDE :-) – LucieCBurgess Aug 08 '17 at 16:20
  • Then this is a question about intellij and why its sbt integration doesn't work properly, unless you have the same problem with sbt. – Marcin Aug 08 '17 at 16:42
  • Hi Marcin, it's not an SBT question. I have verified that SBT is working, I just prefer to use SBT within IntelliJ. Thanks for your help though. @Ramesh_Maharjan has fixed it for me :-) – LucieCBurgess Aug 08 '17 at 17:19

2 Answers2

6

you need to assign sqlContext to a val for implicits to work . Since your sparkSession is a var, implicits won't work with it

So you need to do

val sQLContext = spark.sqlContext
import sQLContext.implicits._

Moreover you can write functions for your tests so that your test class looks as following

    class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {

  private val master = "local[*]"
  private val appName = "data_load_testing"

  var spark: SparkSession = _

  override def beforeEach() {
    spark = new SparkSession.Builder().appName(appName).master(master).getOrCreate()
  }


  test("Creating dataframe should produce data from of correct size") {
    val sQLContext = spark.sqlContext
    import sQLContext.implicits._

    val df = spark.sparkContext
    .textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
    .map(_.split(","))
    .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
    .toDF()

    assert(df.count() == 3)
    assert(df.take(1)(0)(0).equals("Michael"))
  }

  override def afterEach() {
    spark.stop()
  }

}
case class Person(name: String, age: Int)
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • @Ramesh_Maharjan thanks for your help! Unfortunately I'm still getting the following error using your changes: Error:(32, 8) value toDF is not a member of org.apache.spark.rdd.RDD[Person] possible cause: maybe a semicolon is missing before `value toDF'? .toDF() – LucieCBurgess Aug 08 '17 at 16:28
  • you need to move the case class inside the class. – Ramesh Maharjan Aug 08 '17 at 16:36
  • @Ramesh_Maharjan thanks! Moving the case class gets rid of the .toDF() error but now I get a NPE at the val sQLContext line: java.lang.NullPointerException was thrown. java.lang.NullPointerException at data_load.dataLoadTest$$anonfun$1.apply$mcV$sp(dataLoadTest.scala:24) at data_load.dataLoadTest$$anonfun$1.apply(dataLoadTest.scala:23) at data_load.dataLoadTest$$anonfun$1.apply(dataLoadTest.scala:23) – LucieCBurgess Aug 08 '17 at 16:59
  • Sorry, but what did you change? I already added the master(master) to the SparkSession but I'm still getting the NPE :-( Thanks so much for your help, I really appreciate it! – LucieCBurgess Aug 08 '17 at 17:07
  • i have moved the case class outside and i have changed the assetions – Ramesh Maharjan Aug 08 '17 at 17:08
  • NPE is because you are not using test function as suggested by me. – Ramesh Maharjan Aug 08 '17 at 17:18
  • Thanks @Ramesh_Maharjan! You fixed it. Can I just ask please, why does the case class need to be outside the scope of the test object? And if you can explain the SQLContext magic, that would be useful - the Spark API describes it as "a wrapped version of this session in the form of an SQLContext, for backward compatibility". Thanks so much for helping! – LucieCBurgess Aug 08 '17 at 17:21
  • you can accept my answer if it really helped you :) you can upvote too :) – Ramesh Maharjan Aug 08 '17 at 17:23
  • case class have encoders defined and is used for row representation of a dataframe which needs to be serialized and deserialized. If its written inside another scope then it would be difficult to move around. Thats why they need to be outside scope. – Ramesh Maharjan Aug 08 '17 at 17:29
  • @Ramesh_Maharjan strikes again! – wllmtrng Aug 08 '17 at 22:54
2

There are many libraries for unit testing of spark, one of the mostly used is

spark-testing-base: By Holden Karau

This library have all with sc as the SparkContext below is a simple example

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

Here, every thing is prepared with sc as a SparkContext

Another approach is to create a TestWrapper and use for the multiple testcases as below

import org.apache.spark.sql.SparkSession

trait TestSparkWrapper {

  lazy val sparkSession: SparkSession = 
    SparkSession.builder().master("local").appName("spark test example ").getOrCreate()

}

And use this TestWrapper for all the tests with Scala-test, playing with BeforeAndAfterAll and BeforeAndAfterEach.

Hope this helps!

koiralo
  • 22,594
  • 6
  • 51
  • 72