I have performed a simple group by operation on year and do some aggregation as below. I tried to append the result to hdfs path as shown below. I am getting error saying,
org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark;;
Aggregate [year#88], [year#88, sum(rating#89) AS rating#173,
sum(cast(duration#90 as bigint)) AS duration#175L]
+- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
below is my code. can someone please help
val spark =SparkSession.builder().appName("mddd").
enableHiveSupport().config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
config("spark.debug.maxToStringFields",100).
getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/")
import java.util.Calendar
val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))
df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()
my input is in csv format
id,name,year,rating,duration
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733
my expected output should be in hdfs with partition on year
year,rating,duration
1993,7.4,8956
1921,6.0,15212
1963,10.7,17389
I am really not sure whats wrong with my approach. please help