6

Trying to test Spark Structured Streams ...and failing... how can I test them properly?

I followed the general Spark testing question from here, and my closest try was [1] looking something like:

import simpleSparkTest.SparkSessionTestWrapper
import org.scalatest.FunSpec  
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructType, DateType}
import org.apache.spark.sql.streaming.OutputMode

class StructuredStreamingSpec extends FunSpec with SparkSessionTestWrapper {

  describe("Structured Streaming") {

    it("Read file from system") {

      val schema = new StructType()
        .add("station_id", IntegerType)
        .add("name", StringType)
        .add("lat", DoubleType)
        .add("long", DoubleType)
        .add("dockcount", IntegerType)
        .add("landmark", StringType)
        .add("installation", DateType)

      val sourceDF = spark.readStream
        .option("header", "true")
        .schema(schema)
        .csv("/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv")
        .coalesce(1)

      val countSource = sourceDF.count()

      val query = sourceDF.writeStream
        .format("memory")
        .queryName("Output")
        .outputMode(OutputMode.Append())
        .start()
        .processAllAvailable()

      assert(countSource === 70)
    }

  }

}

Sadly it always fails with org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

I also found this issue at the spark-testing-base repo and wonder if it is even possible to test Spark Structured Streaming?

I want to have integration test and maybe even use Kafka on top for testing Checkpointing or specific corrupt data scenarios. Can someone help me out?

Last but not least, I figured the version maybe also a constraint - I currently develop against 2.1.0 which I need because of Azure HDInsight deployment options. Self hosted is an option if this is the drag.

lospejos
  • 1,976
  • 3
  • 19
  • 35
lony
  • 6,733
  • 11
  • 60
  • 92
  • 1
    Try `.trigger(Trigger.Once())` with `query.awaitTermination()` and see if you get a different result https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers – Paul Leclercq Mar 28 '18 at 15:17

2 Answers2

2

Did you solve this?

You are doing a count() on a streaming dataframe before starting the execution by calling start(). If you want a count, how about doing this?

  sourceDF.writeStream
    .format("memory")
    .queryName("Output")
    .outputMode(OutputMode.Append())
    .start()
    .processAllAvailable()

  val results: List[Row] = spark.sql("select * from Output").collectAsList()
  assert(results.size() === 70) 
Sumeeth
  • 21
  • 3
1

You can also use the StructuredStreamingBase trait from @holdenk testing library : https://github.com/holdenk/spark-testing-base/blob/936c34b6d5530eb664e7a9f447ed640542398d7e/core/src/test/2.2/scala/com/holdenkarau/spark/testing/StructuredStreamingSampleTests.scala

Here's an example on how to use it :

class StructuredStreamingTests extends FunSuite with SharedSparkContext with StructuredStreamingBase {

override implicit def reuseContextIfPossible: Boolean = true

test("add 3") {
    import spark.implicits._
    val input = List(List(1), List(2, 3))
    val expected = List(4, 5, 6)
    def compute(input: Dataset[Int]): Dataset[Int] = {
        input.map(elem => elem + 3)
    }
    testSimpleStreamEndState(spark, input, expected, "append", compute)
}}
mahmoud mehdi
  • 1,493
  • 1
  • 19
  • 28