92

I've been trying to find a reasonable way to test SparkSession with the JUnit testing framework. While there seem to be good examples for SparkContext, I couldn't figure out how to get a corresponding example working for SparkSession, even though it is used in several places internally in spark-testing-base. I'd be happy to try a solution that doesn't use spark-testing-base as well if it isn't really the right way to go here.

Simple test case (complete MWE project with build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

The result of running this with JUnit is an NPE at the load line:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Note it shouldn't matter that the file being loaded exists or not; in a properly configured SparkSession, a more sensible error will be thrown.

bbarker
  • 11,636
  • 9
  • 38
  • 62
  • 1
    Thanks to all for the responses so far; I hope to review soon. I also opened up an issue and am cross referencing it here: https://github.com/holdenk/spark-testing-base/issues/180 – bbarker May 05 '17 at 20:15
  • Unfortunately, I still haven't gotten around to actually using Spark ... some day, maybe 3.x at this rate - otherwise I would work on accepting an answer. Glad this has been useful to others. – bbarker Apr 13 '18 at 02:20

6 Answers6

132

Thank you for putting this outstanding question out there. For some reason, when it comes to Spark, everyone gets so caught up in the analytics that they forget about the great software engineering practices that emerged the last 15 years or so. This is why we make it a point to discuss testing and continuous integration (among other things like DevOps) in our course.

A Quick Aside on Terminology

A true unit test means you have complete control over every component in the test. There can be no interaction with databases, REST calls, file systems, or even the system clock; everything has to be "doubled" (e.g. mocked, stubbed, etc) as Gerard Mezaros puts it in xUnit Test Patterns. I know this seems like semantics, but it really matters. Failing to understand this is one major reason why you see intermittent test failures in continuous integration.

We Can Still Unit Test

So given this understanding, unit testing an RDD is impossible. However, there is still a place for unit testing when developing analytics.

Consider a simple operation:

rdd.map(foo).map(bar)

Here foo and bar are simple functions. Those can be unit tested in the normal way, and they should be with as many corner cases as you can muster. After all, why do they care where they are getting their inputs from whether it is a test fixture or an RDD?

Don't Forget the Spark Shell

This isn't testing per se, but in these early stages you also should be experimenting in the Spark shell to figure out your transformations and especially the consequences of your approach. For example, you can examine physical and logical query plans, partitioning strategy and preservation, and the state of your data with many different functions like toDebugString, explain, glom, show, printSchema, and so on. I will let you explore those.

You can also set your master to local[2] in the Spark shell and in your tests to identify any problems that may only arise once you start to distribute work.

Integration Testing with Spark

Now for the fun stuff.

In order to integration test Spark after you feel confident in the quality of your helper functions and RDD/DataFrame transformation logic, it is critical to do a few things (regardless of build tool and test framework):

  • Increase JVM memory.
  • Enable forking but disable parallel execution.
  • Use your test framework to accumulate your Spark integration tests into suites, and initialize the SparkContext before all tests and stop it after all tests.

With ScalaTest, you can mix in BeforeAndAfterAll (which I prefer generally) or BeforeAndAfterEachas @ShankarKoirala does to initialize and tear down Spark artifacts. I know this is a reasonable place to make an exception, but I really don't like those mutable vars you have to use though.

The Loan Pattern

Another approach is to use the Loan Pattern.

For example (using ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

As you can see, the Loan Pattern makes use of higher-order functions to "loan" the SparkContext to the test and then to dispose of it after it's done.

Suffering-Oriented Programming (Thanks, Nathan)

It is totally a matter of preference, but I prefer to use the Loan Pattern and wire things up myself as long as I can before bringing in another framework. Aside from just trying to stay lightweight, frameworks sometimes add a lot of "magic" that makes debugging test failures hard to reason about. So I take a Suffering-Oriented Programming approach--where I avoid adding a new framework until the pain of not having it is too much to bear. But again, this is up to you.

The best choice for that alternate framework is of course spark-testing-base as @ShankarKoirala mentioned. In that case, the test above would look like this:

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
    
          total shouldBe 1000
        }
      }
 }

Note how I didn't have to do anything to deal with the SparkContext. SharedSparkContext gave me all that--with sc as the SparkContext--for free. Personally though I wouldn't bring in this dependency for just this purpose since the Loan Pattern does exactly what I need for that. Also, with so much unpredictability that happens with distributed systems, it can be a real pain to have to trace through the magic that happens in the source code of a third-party library when things go wrong in continuous integration.

Now where spark-testing-base really shines is with the Hadoop-based helpers like HDFSClusterLike and YARNClusterLike. Mixing those traits in can really save you a lot of setup pain. Another place where it shines is with the Scalacheck-like properties and generators--assuming of course you understand how property-based testing works and why it is useful. But again, I would personally hold off on using it until my analytics and my tests reach that level of sophistication.

"Only a Sith deals in absolutes." -- Obi-Wan Kenobi

Of course, you don't have to choose one or the other either. Perhaps you could use the Loan Pattern approach for most of your tests and spark-testing-base only for a few, more rigorous tests. The choice isn't binary; you can do both.

Integration Testing with Spark Streaming

Finally, I would just like to present a snippet of what a SparkStreaming integration test setup with in-memory values might look like without spark-testing-base:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

This is simpler than it looks. It really just turns a sequence of data into a queue to feed to the DStream. Most of it is really just boilerplate setup that works with the Spark APIs. Regardless, you can compare this with StreamingSuiteBase as found in spark-testing-base to decide which you prefer.

This might be my longest post ever, so I will leave it here. I hope others chime in with other ideas to help improve the quality of our analytics with the same agile software engineering practices that have improved all other application development.

And with apologies for the shameless plug, you can check out our course Software Engineering with Apache Spark, where we address a lot of these ideas and more. We hope to have an online version soon.

Vidya
  • 29,932
  • 7
  • 42
  • 70
  • Thanks for the detailed answer, but using the loan pattern will make you start and stop the spark context for each of the test case defined, and the only way to avoid this to use the answer provided by koiralo ? – Akhil Mar 01 '21 at 14:16
  • This should be selected as the right answer to the question. – nomadSK25 Feb 27 '22 at 23:52
29

You can write a simple test with FunSuite and BeforeAndAfterEach like below

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

You don't need to create a functions in test you can simply write as

test ("test name") {//implementation and assert}

Holden Karau has written really nice test spark-testing-base

You need to check out below is a simple example

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

Hope this helps!

koiralo
  • 22,594
  • 6
  • 51
  • 72
  • Great answer. The [spark-spec](https://github.com/MrPowers/spark-spec) used a similar approach, but it was too slow when a lot of test files were added to the project. See my answer for an alternate implementation that doesn't force the SparkSession to be stopped / started after each test file. – Powers May 04 '17 at 14:06
  • 2
    I like the first part of this answer too; I just wish the second example had Spark stuff in it instead of a toy assertion. Beyond that though, I would point out that the notion of performing expensive side-effecting before and/or after a suite of tests is not a new idea. As I suggest in my answer, ScalaTest has ample mechanisms for that--in this case for managing Spark artifacts-- and you can use those as you would for any other expensive fixtures. At least until the time comes where bringing in a heavier third-party framework is worth it. – Vidya May 05 '17 at 20:05
  • On a side note, ScalaTest and specs2 (which I think does so by default) can both run tests in parallel for speed gains. Build tools can also help. But again, none of this is new. – Vidya May 05 '17 at 20:20
  • I have edited the appropriate test example for spark-testing-base as per your suggestion. Thanks, – koiralo May 06 '17 at 12:06
27

Since Spark 1.6 you could use SharedSparkContext or SharedSQLContext that Spark uses for its own unit tests:

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Since Spark 2.3 SharedSparkSession is available:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

UPDATE:

Maven dependency:

<dependency>
  <groupId>org.scalactic</groupId>
  <artifactId>scalactic</artifactId>
  <version>SCALATEST_VERSION</version>
</dependency>
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>SCALATEST_VERSION</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

SBT dependency:

"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

In addition, you could check test sources of Spark where there is a huge set of various test suits.

UPDATE 2:

Apache Spark Unit Testing Part 1 — Core Components

Apache Spark Unit Testing Part 2 — Spark SQL

Apache Spark Unit Testing Part 3 — Streaming

Apache Spark Integration Testing

Test Driven Development of Apache Spark applications

Eugene Lopatkin
  • 2,351
  • 1
  • 22
  • 34
  • 1
    do you know which maven package contains this class? – James Gan Jun 19 '18 at 00:02
  • Of course. Both of it in `"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"` – Eugene Lopatkin Jun 19 '18 at 05:55
  • For Maven ` org.apache.spark spark-sql SPARK_VERSION test-jar test ` – Eugene Lopatkin Jun 19 '18 at 06:03
  • 4
    For me it was also necessary to add _sources_ of spark-core and spark-catalyst with `libraryDependencies += "org.apache.spark" %% "spark-core" % SPARK_VERSION withSources()` `libraryDependencies += "org.apache.spark" %% "spark-catalyst" % SPARK_VERSION withSources()` – rad i Mar 13 '19 at 09:49
  • Good day, Huang! What do you mean about "Cannot resolve symbol test"? Where was it happened? – Eugene Lopatkin Apr 26 '19 at 08:34
  • Also, ensure [scalatest version matches what spark is using](https://stackoverflow.com/a/59274024/1080804) – ecoe Dec 10 '19 at 20:59
16

I like to create a SparkSessionTestWrapper trait that can be mixed in to test classes. Shankar's approach works, but it's prohibitively slow for test suites with multiple files.

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

The trait can be used as follows:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

Check the spark-spec project for a real-life example that uses the SparkSessionTestWrapper approach.

Update

The spark-testing-base library automatically adds the SparkSession when certain traits are mixed in to the test class (e.g. when DataFrameSuiteBase is mixed in, you'll have access to the SparkSession via the spark variable).

I created a separate testing library called spark-fast-tests to give the users full control of the SparkSession when running their tests. I don't think a test helper library should set the SparkSession. Users should be able to start and stop their SparkSession as they see fit (I like to create one SparkSession and use it throughout the test suite run).

Here's an example of the spark-fast-tests assertSmallDatasetEquality method in action:

import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}
Powers
  • 18,150
  • 10
  • 103
  • 108
  • 1
    In this approach how do you recommend adding `sparkSession.stop()` somewhere? – Neil Best Jun 05 '17 at 18:44
  • You shouldn't need to `sparkSession.stop()` @NeilBest. The Spark Session will be shut down when the test suite finishes running. – Powers Jun 06 '17 at 20:33
  • 1
    why need not to sparkSession.stop()? as @Shankar Koirala 's answer stop the sparkSession, is this useless? – yuxh Aug 28 '18 at 02:29
  • @yuxh - Shankar's answer starts and stops the Spark session after every test. This approach works, but it's really slow because it takes a while to start a Spark session. – Powers Aug 29 '18 at 05:21
  • 1
    but he also mention spark-testing-base , SharedSparkContext stops this context after all test cases . I don't see any code stop even after all test cases in your SparkSessionTestWrapper – yuxh Aug 30 '18 at 06:48
  • @yuxh - I updated my answer to specifically discuss spark-testing-base and spark-fast-tests. Let me know if this helps. And yes, I never explicitly shut down the SparkSession. I just let the program finish running and then the SparkSession gets shut down automatically. – Powers Sep 01 '18 at 20:58
  • I'm not good at scala or spark , can you tell me how SparkSession can gets shut down automatically? – yuxh Sep 13 '18 at 03:44
1

I could solve the problem with below code

spark-hive dependency is added in project pom

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }
sunitha
  • 1,468
  • 14
  • 18
1

Another way to Unit Test using JUnit

import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.{After, Before, _}

@Test
class SessionSparkTest {
  var spark: SparkSession = _

  @Before
  def beforeFunction(): Unit = {
    //spark = SessionSpark.getSparkSession()
    spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
    System.out.println("Before Function")
  }

  @After
  def afterFunction(): Unit = {
    spark.stop()
    System.out.println("After Function")
  }

  @Test
  def testRddCount() = {
    val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
    val count = rdd.count()
    assertTrue(3 == count)
  }

  @Test
  def testDfNotEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
    assertFalse(numDf.head(1).isEmpty)
  }

  @Test
  def testDfEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
    assertTrue(emptyDf.head(1).isEmpty)
  }
}

case class Num(id: Int)
Thirupathi Chavati
  • 1,711
  • 12
  • 10