Trying to test Spark Structured Streams ...and failing... how can I test them properly?
I followed the general Spark testing question from here, and my closest try was [1] looking something like:
import simpleSparkTest.SparkSessionTestWrapper
import org.scalatest.FunSpec
import org.apache.spark.sql.types.{StringType, IntegerType, DoubleType, StructType, DateType}
import org.apache.spark.sql.streaming.OutputMode
class StructuredStreamingSpec extends FunSpec with SparkSessionTestWrapper {
describe("Structured Streaming") {
it("Read file from system") {
val schema = new StructType()
.add("station_id", IntegerType)
.add("name", StringType)
.add("lat", DoubleType)
.add("long", DoubleType)
.add("dockcount", IntegerType)
.add("landmark", StringType)
.add("installation", DateType)
val sourceDF = spark.readStream
.option("header", "true")
.schema(schema)
.csv("/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv")
.coalesce(1)
val countSource = sourceDF.count()
val query = sourceDF.writeStream
.format("memory")
.queryName("Output")
.outputMode(OutputMode.Append())
.start()
.processAllAvailable()
assert(countSource === 70)
}
}
}
Sadly it always fails with org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
I also found this issue at the spark-testing-base repo and wonder if it is even possible to test Spark Structured Streaming?
I want to have integration test and maybe even use Kafka on top for testing Checkpointing or specific corrupt data scenarios. Can someone help me out?
Last but not least, I figured the version maybe also a constraint - I currently develop against 2.1.0 which I need because of Azure HDInsight deployment options. Self hosted is an option if this is the drag.