-4

I want to create dataframe on top of kafka topic and after that i want to register that dataframe as temp table to perform minus operation on data. I have written below code. But while querying registered table I'm getting error "org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"

org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "SERVER ******").option("subscribe", "TOPIC_NAME").option("startingOffsets", "earliest").load()

df.printSchema()

val personStringDF = df.selectExpr("CAST(value AS STRING)")

val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))


val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")

personDF.registerTempTable("final_df1")

spark.sql("select * from final_df1").show 

ERROR:---------- "org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"

Also i have used start() method and I'm getting below error.

20/08/11 00:59:30 ERROR streaming.MicroBatchExecution: Query final_df1 [id = 1a3e2ea4-2ec1-42f8-a5eb-8a12ce0fb3f5, runId = 7059f3d2-21ec-43c4-b55a-8c735272bf0f] terminated with error java.lang.AbstractMethodError

NOTE: My main objective behind writing this script is i want to write minus query on this data and want to compare it with one of the register table i have on cluster. So , to summarise If I'm sending 1000 records in kafka topic from oracle database, I'm creating dataframe on top of oracle table , registering it as temp table and same I'm doing with kafka topic. Than i want to run minus query between source(oracle) and target(kafka topic). to perform 100% data validation between source and target. (Registering kafka topic as temporary table is possible?)

  • Hi, Welcome Parth to the tech community. You could refer https://stackoverflow.com/questions/40609771/queries-with-streaming-sources-must-be-executed-with-writestream-start – hagarwal Aug 10 '20 at 05:29

2 Answers2

3

Use memory sink instead of registerTempTable. Check below code.

org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "SERVER ******")
.option("subscribe", "TOPIC_NAME")
.option("startingOffsets", "earliest")
.load()

df.printSchema()

val personStringDF = df.selectExpr("CAST(value AS STRING)")

val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))


val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")


personDF
.writeStream
.outputMode("append")
.format("memory")
.queryName("final_df1").start()

spark.sql("select * from final_df1").show(10,false)

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • After using start() method, I'm getting Error:--- 20/08/11 00:59:30 ERROR streaming.MicroBatchExecution: Query final_df1 [id = 1a3e2ea4-2ec1-42f8-a5eb-8a12ce0fb3f5, runId = 7059f3d2-21ec-43c4-b55a-8c735272bf0f] terminated with error java.lang.AbstractMethodError – Parth Lakhtaria Aug 11 '20 at 06:01
  • Also, My main objective behind writing this script is i want to write minus query on this data and want to compare it with one of the register table i have on cluster. So , to summarise If I'm sending 1000 records in kafka topic from oracle database, I'm creating dataframe on top of oracle table , registering it as temp table and same I'm doing with kafka topic. Than i want to run minus query between source(oracle) and target(kafka topic). to perform 100% data validation between source and target. – Parth Lakhtaria Aug 11 '20 at 06:05
  • ```memory``` source is not for production use case & think for alternative. – Srinivas Aug 12 '20 at 04:49
1

Streaming DataFrame doesn't support the show() method. When you call start() method, it will start a background thread to stream the input data to the sink, and since you are using ConsoleSink, it will output the data to the console. You don't need to call show().

remove the below lines,

personDF.registerTempTable("final_df1")
spark.sql("select * from final_df1").show 

and add the below or equivalent lines instead,

val query1 = personDF.writeStream.queryName("final_df1").format("memory").outputMode("append").start()
query1.awaitTermination()
sathya
  • 1,982
  • 1
  • 20
  • 37
  • After using above method , use of start(), I'm getting below error. Error:------ 20/08/11 00:59:30 ERROR streaming.MicroBatchExecution: Query final_df1 [id = 1a3e2ea4-2ec1-42f8-a5eb-8a12ce0fb3f5, runId = 7059f3d2-21ec-43c4-b55a-8c735272bf0f] terminated with error java.lang.AbstractMethodError – Parth Lakhtaria Aug 11 '20 at 06:01
  • Also, My main objective behind writing this script is i want to write minus query on this data and want to compare it with one of the register table i have on cluster. So , to summarise If I'm sending 1000 records in kafka topic from oracle database, I'm creating dataframe on top of oracle table , registering it as temp table and same I'm doing with kafka topic. Than i want to run minus query between source(oracle) and target(kafka topic). to perform 100% data validation between source and target. – Parth Lakhtaria Aug 11 '20 at 06:05