I am running the following Beam pipeline code locally, with the FlinkRunner. PubsubIO is used to read messages from a topic.
I have a separate thread that publishes messages to the topic at regular intervals (every 30 seconds) and also sets the "ts" attribute which is used later to derive the event time.
Custom transform to convert to KV pair -
private static class PubSubMessageGrouper extends DoFn<PubsubMessage, KV<String, PubsubMessage>> {
@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage element = c.element();
KV<String, PubsubMessage> kv = KV.of(element.getAttribute("key"), element);
c.output(kv);
}
}
Note that "key" is a key set in the message attributes earlier in the publisher thread. The intent is to group the messages downstream by this key.
Pipeline code -
PCollection<PubsubMessage> pubsubColl = p
.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromTopic("projects/" + projectName + "/topics/beamtest")
);
PCollection<KV<String, PubsubMessage>> idfied =
pubsubColl.apply(ParDo.of(new PubSubMessageGrouper()));
PCollection<KV<String, PubsubMessage>> windowed = idfied
.apply(Window.<KV<String, PubsubMessage>>into(FixedWindows.of(Duration.standardSeconds(15)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.standardSeconds(15))
.discardingFiredPanes());
PCollection<KV<String, Iterable<PubsubMessage>>> grouped = windowed.apply(GroupByKey.create());
grouped.apply(ParDo.of(new KVPrinter()));
The transforms are not chained for ease of reading. The KVPrinter transform in the end is just to print out the messages received from the group by, which will be subsequently replaced by actual code once I get this running.
When I run this, I don't find the trigger executing for quite some time (a couple of minutes or longer). When it finally triggers, I see that some of the messages are not received (in the final step). Is this due to the internal watermark that PubsubIO uses? My intention here is to make sure that all messages are processed in the groupby, including late ones within the allowed lateness window.
I have looked at related questions but without getting much help from them. Running the same thing with DirectRunner produces no output at all, regardless of how long I wait. Another point to note is that if I remove the GroupBy part of the pipeline, the triggers are fired.
What is the watermark heuristic for PubsubIO running on GCD?
Apache Beam PubSubIO with GroupByKey
Consuming unbounded data in windows with default trigger
Note: I'm tagging this with Dataflow too even though I haven't tested this on Dataflow. The reasons are that I intend to deploy this on Dataflow, and also to ask for help from people from the Dataflow team who are watching the tag.