15

I use Spark 2.2.0-rc1.

I've got a Kafka topic which I'm querying a running watermarked aggregation, with a 1 minute watermark, giving out to console with append output mode.

import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingOffsets", "earliest").
  option("subscribe", "topic").
  load.
  select(from_json(col("value").cast("string"), schema).as("value"))
  select("value.*").
  withWatermark("time", "1 minute").
  groupBy("time").
  count.
  writeStream.
  outputMode("append").
  format("console").
  start

I am pushing following data in Kafka topic:

{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}

And I am getting following output:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

Is this expected behaviour?

zero323
  • 322,348
  • 103
  • 959
  • 935
himanshuIIITian
  • 5,985
  • 6
  • 50
  • 70
  • 2
    Having a same problem, with spark 2.1. I'm reading a stream from disk, and using `.withWatermark` and groupBy(window(...)) aggregation - no data is being put out. Without watermarking data is being processed normally. – Rayan Ral Jun 08 '17 at 14:23
  • It seems to be a bug...like this one - https://issues.apache.org/jira/browse/SPARK-20065. – himanshuIIITian Jun 08 '17 at 16:47
  • 2
    @RayanRal are you using "complete" mode without watermarking and "append" mode with watermarking? – zsxwing Jun 08 '17 at 20:22
  • 2
    @zsxwing Tried both: `complete` mode without watermarking - output is ok, `append` mode with watermarking - output is empty – Rayan Ral Jun 09 '17 at 06:25
  • 1
    @zsxwing Looks like it really waits for new files, to dump old ones to a batch. When I just put 4 files (each with 1 hour difference timestamp) and set watermark to 1 hour - it didn't write output. When I add files during the run, one by one - it creates batches, but with some kind of lag - 2 hours need to be added, to get a formed batch. Is there any way to create batch by-file, and not by column value? – Rayan Ral Jun 09 '17 at 06:49
  • 1
    Just in case anyone would be looking here for answer to problem in my comment - there's an option `maxFilesPerTrigger` that you can specify when reading from disk. And here's a great article series - https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html – Rayan Ral Jun 09 '17 at 11:20
  • 1
    @RayanRal Explaining your 2 hours problem here: https://gist.github.com/zsxwing/18ee545ac640d3009b793763a2e7ec8b – zsxwing Jun 09 '17 at 21:47
  • 1
    I understand this works for you in complete mode but not append. Can you pose the results for complete mode? – Assaf Mendelson Jul 06 '17 at 13:48
  • 1
    Hello All, is there any solution for this issue? what parameters we need to change ?i am also facing same issue – BigD Jan 27 '19 at 10:35
  • @BigD: can you please share your code snippet? Or, is it the same as this one? – himanshuIIITian Jan 27 '19 at 12:32
  • 1
    I have posted here .. i am getting data reflected after third time when i add data to the read stream directory https://stackoverflow.com/questions/54378219/spark-structured-streaming-delaying-2-batches-of-data-always?noredirect=1#comment95586679_54378219 – BigD Jan 27 '19 at 20:09
  • 1
    @himanshuIIITian .. can you please let me know how did you resolve this ? – BigD Jan 28 '19 at 10:39

2 Answers2

8

Pushing more data to Kafka should trigger Spark to output something. The current behavior is totally because of the internal implementation.

When you push some data, StreamingQuery will generate a batch to run. When this batch finishes, it will remember the max event time in this batch. Then in the next batch, because you are using append mode, StreamingQuery will use the max event time and watermark to evict old values from StateStore and output it. Therefore you need to make sure generating at least two batches in order to see output.

zsxwing
  • 20,270
  • 4
  • 37
  • 59
  • Thanks for the response! But I generated 4 Batches but still got empty output. I have updated my question with all 4 batches. – himanshuIIITian Jun 09 '17 at 04:38
  • 1
    I see. Another issue is your json values are invalid. It should be `{"time":"2017-06-07 10:01:00.000"}`. If you use "complete" mode, you will see it just outputs null. – zsxwing Jun 09 '17 at 20:32
  • After correcting the json value, I can see the output in `complete` mode but not in `append` mode. Why ? – himanshuIIITian Jun 10 '17 at 05:55
  • Also, there are no examples on `watermarking` in Spark. Can you provide me an example on it ? – himanshuIIITian Jun 10 '17 at 05:57
  • 1
    to quote spark documentation - For a specific window ending at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T). In other words, late data within the threshold will be aggregated, but data later than the threshold will start getting dropped – aandis Aug 04 '20 at 14:19
5

Here's my best guess:

Append mode only outputs the data after the watermark has passed (e.g. in this case 1 minute later). You didn't set a trigger (e.g. .trigger(Trigger.ProcessingTime("10 seconds")) so by default it outputs batches as fast as possible. So for the first minute all your batches should be empty, and the first batch after a minute should contain some content.

Another possibility is that you're using groupBy("time") instead of groupBy(window("time", "[window duration]")). I believe watermarks are meant to be used with time windows or mapGroupsWithState, so I'm not how the interaction works in this case.

Ray J
  • 805
  • 1
  • 9
  • 13
  • 2
    I was having a similar problem while trying to group by the time variable. But changing it to window(time, ...) fixed my problem – Paul Oct 15 '17 at 20:00