Every second PubSub gets a message. Dataflow take this messages using ReadFromPubSub and FixedWindows one time in a minute. I have delay between windows ~6 seconds and losted messages (every window has 57 - 63 events). If increase the number of messages in PubSub scatter of messages increase too. How to close the gap between windows and receive all messages of minute in one window?
Asked
Active
Viewed 538 times
0
-
2I think this is explained in [watermark and late data documentation](https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data) You can allow late data by invoking the .withAllowedLateness operation when you set your PCollection's windowing strategy. The following code example demonstrates a windowing strategy that will allow late data up to two days after the end of a window. For details of the semantics see this [post](https://stackoverflow.com/questions/42169004/what-is-the-watermark-heuristic-for-pubsubio-running-on-gcd). – Enrique Zetina Sep 08 '20 at 22:18
-
I think that @EnriqueZetina comment is the right answer. Share your code if you want that we have a try and to update it – guillaume blaquiere Sep 09 '20 at 07:38
-
`pipeline | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)) | 'Window into Fixed Intervals' >> beam.WindowInto(window.FixedWindows(60)) | 'Filter Canary events' >> beam.Filter(is_canary_event) | 'Aggregate Count' >> beam.CombineGlobally(logcount).without_defaults() | 'Write aggregated sums to GCS' >> beam.ParDo(WriteToGCS(output_path)))` – Degrijo Sep 09 '20 at 09:06
1 Answers
1
Apache Beam fixed windows make use of an elements timestamp:
See docs 3.2.6. Element timestamps
For pubsub there are two ways that an element is given a timestamp:
- The time the elements was put into the PubSubtopic
- Message value to use as element timestamp.
1 is the default, to use 2 make use of timestamp_attribute.
So this means when publishing your message you set the timestamp_attribute Metadata for the message. The Fixedwindow will then by default use Event time triggers which if I have understood your use case correctly is what you are looking for.

Reza Rokni
- 1,206
- 7
- 12