0

Every second PubSub gets a message. Dataflow take this messages using ReadFromPubSub and FixedWindows one time in a minute. I have delay between windows ~6 seconds and losted messages (every window has 57 - 63 events). If increase the number of messages in PubSub scatter of messages increase too. How to close the gap between windows and receive all messages of minute in one window?

Degrijo
  • 15
  • 3
  • 2
    I think this is explained in [watermark and late data documentation](https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data) You can allow late data by invoking the .withAllowedLateness operation when you set your PCollection's windowing strategy. The following code example demonstrates a windowing strategy that will allow late data up to two days after the end of a window. For details of the semantics see this [post](https://stackoverflow.com/questions/42169004/what-is-the-watermark-heuristic-for-pubsubio-running-on-gcd). – Enrique Zetina Sep 08 '20 at 22:18
  • I think that @EnriqueZetina comment is the right answer. Share your code if you want that we have a try and to update it – guillaume blaquiere Sep 09 '20 at 07:38
  • `pipeline | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)) | 'Window into Fixed Intervals' >> beam.WindowInto(window.FixedWindows(60)) | 'Filter Canary events' >> beam.Filter(is_canary_event) | 'Aggregate Count' >> beam.CombineGlobally(logcount).without_defaults() | 'Write aggregated sums to GCS' >> beam.ParDo(WriteToGCS(output_path)))` – Degrijo Sep 09 '20 at 09:06

1 Answers1

1

Apache Beam fixed windows make use of an elements timestamp:

See docs 3.2.6. Element timestamps

For pubsub there are two ways that an element is given a timestamp:

  1. The time the elements was put into the PubSubtopic
  2. Message value to use as element timestamp.

1 is the default, to use 2 make use of timestamp_attribute.

So this means when publishing your message you set the timestamp_attribute Metadata for the message. The Fixedwindow will then by default use Event time triggers which if I have understood your use case correctly is what you are looking for.

Reza Rokni
  • 1,206
  • 7
  • 12