I use Spark 2.2.0-rc1.
I've got a Kafka topic
which I'm querying a running watermarked aggregation, with a 1 minute
watermark, giving out to console
with append
output mode.
import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("startingOffsets", "earliest").
option("subscribe", "topic").
load.
select(from_json(col("value").cast("string"), schema).as("value"))
select("value.*").
withWatermark("time", "1 minute").
groupBy("time").
count.
writeStream.
outputMode("append").
format("console").
start
I am pushing following data in Kafka topic
:
{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}
And I am getting following output:
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
Is this expected behaviour?