I am getting some issues while executing spark SQL on top of spark structures streaming. PFA for error.
here is my code
object sparkSqlIntegration {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.config("spark.sql.streaming.checkpointLocation", "file:///C:/checkpoint")
.getOrCreate()
setupLogging()
val userSchema = new StructType().add("name", "string").add("age", "integer")
// Create a stream of text files dumped into the logs directory
val rawData = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///C:/Users/R/Documents/spark-poc-centri/csvFolder")
// Must import spark.implicits for conversion to DataSet to work!
import spark.implicits._
rawData.createOrReplaceTempView("updates")
val sqlResult= spark.sql("select * from updates")
println("sql results here")
sqlResult.show()
println("Otheres")
val query = rawData.writeStream.outputMode("append").format("console").start()
// Keep going until we're stopped.
query.awaitTermination()
spark.stop()
}
}
During execution, I am getting the following error. As I am new to streaming can anyone tell how can I execute spark SQL queries on spark structured streaming
2018-12-27 16:02:40 INFO BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, LAPTOP-5IHPFLOD, 6829, None)
2018-12-27 16:02:41 INFO ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6731787b{/metrics/json,null,AVAILABLE,@Spark}
sql results here
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[file:///C:/Users/R/Documents/spark-poc-centri/csvFolder]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)