0

I'm currently attempting to use withIdAttribute with PubSubIO to deduplicate messages that come from PubSub (since PubSub only guarantees at least once delivery).

My messages have four fields, label1, label2, timestamp, and value. A value is unique to the two labels at some timestamp. Therefore, I additionally set a uniqueID attribute before writing to PubSub equal to these three values joined as a string.

For example, this is what I get from reading from a subscription using the gcp console tool.

┌───────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                    DATA                                                   │   MESSAGE_ID   │                                               ATTRIBUTES                                          │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────┼────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"label1":"5c381a51-2873-49b8-acf5-60a0fa59fc65","label2":"foobarbaz","timestamp":1513199383,"value":4.2} │ 11185357338249 │ eventTime=2017-12-13T21:09:43Z uniqueID=5c381a51-2873-49b8-acf5-60a0fa59fc65:foobarbaz:1513199383 │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘

In my beam job, running on GCP Dataflow, I decode these messages as json, window them, group them by their two labels, and then attempt to aggregate them. However, in my aggregation class CreateMyAggregationsFn I'm seeing duplicate messages that have the same label1, label2, and timestamp.

public class MyBeam {
  public interface MyBeanOptions extends PipelineOptions {
    // ...
  }

  private static class MyMessage implements Serializable {
    public long timestamp;
    public double value;
    public String label1;
    public String label2;
  }

  private static class CreateMyAggregationsFn extends DoFn<KV<String, Iterable<MyMessage>>, MyAggregate> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      ArrayList<MyMessage> messages = new ArrayList<>();
      c.element().getValue().forEach(messages::add);
      Collections.sort(messages, (msg1, msg2) -> Long.compare(msg1.timestamp, msg2.timestamp));

      MyMessage prev = null
      for (MyMessage msg : messages) {
        if (prev != null &&
            msg.timestamp == prev.timestamp && 
            msg.label1.equals(prev.label1) && 
            msg.label2.equals(prev.label2)) {
            // ... identifying duplicates here
        }
        prev = msg;
      }
      ...
    }
  }

  public static void main(String[] args) throws IOException {
    MyBeamOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyBeamOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    PubsubIO.Read<String> pubsubReadSubscription =
        PubsubIO.readStrings()
            .withTimestampAttribute("eventTime")
            .withIdAttribute("uniqueID")
            .fromSubscription(options.getPubsubSubscription());
    pipeline
        .apply("PubsubReadSubscription", pubsubReadSubscription)
        .apply("ParseJsons", ParseJsons.of(MyMessage.class))
        .setCoder(SerializableCoder.of(MyMessage.class))
        .apply(
            "Window",
            Window.<MyMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
                .triggering(
                    AfterWatermark.pastEndOfWindow()
                        .withLateFirings(AfterPane.elementCountAtLeast(1)))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(3600)))
        .apply(
            "PairMessagesWithLabels",
            MapElements.into(
                    TypeDescriptors.kvs(
                        TypeDescriptors.strings(), TypeDescriptor.of(MyMessage.class)))
                .via(msg -> KV.of(msg.label1 + ":" + msg.label2, msg)))
        .apply("GroupMessagesByLabels", GroupByKey.<String, MyMessage>create())
        .apply("CreateAggregations", ParDo.of(new CreateMyAggregationsFn()))
        // ...
    PipelineResult result = pipeline.run();
  }
}

Is there an additional step to deduping messages from PubSubIO with the withIdAttribute method that I'm missing?

Simson
  • 3,373
  • 2
  • 24
  • 38
Devon Peticolas
  • 430
  • 5
  • 9

1 Answers1

2

You are specifying accumulatingFiredPanes(), which means that in case of multiple firings for a window (e.g. if late data arrives) you are asking successive firings to include all the elements from previous firings, not just new elements. This by definition produces duplication. What are you trying to achieve by specifying accumulatingFiredPanes()?

jkff
  • 17,623
  • 5
  • 53
  • 85
  • Yes. I expect duplicate writes downstream of this process. The results are upserted. The place I'm seeing duplicates however is from upstream (from pubsub) where it occasionally delivers me the same event twice. And, in my windowing function, my elements are not deduped as I would expect them to be by `withIdAttribute` – Devon Peticolas Dec 18 '17 at 19:19
  • How do you determine that you're receiving duplicates? Are you logging the event ids in one of your ParDos (which one?) and seeing the same id logged multiple times? – jkff Dec 18 '17 at 21:50
  • Thank you for the response. Yes. In the `CreateMyAggregationsFn` parDo I'm sorting by timestamp and lopping over them comparing each `label1`, `label2`, and `timestamp` to the previous. For now, this is an effective way to manually identify and dedupe the messages. Not sure what you mean by `eventId`. In my use case, when it hits `CreateMyAggregationsFn` the windowed and grouped elements are instances of the `MyMessage` class, so I'm inspecting the values. I'm confident that these values match the `uniqueID` attribute. Could you suggest a way to capture this value downstream to confirm? – Devon Peticolas Dec 19 '17 at 21:31
  • I've updated my sample code in `CreateMyAggregationsFn` to further clarify how I'm identifying duplicates. – Devon Peticolas Dec 19 '17 at 21:48
  • Hmm: you're asking PubsubIO.read() to deduplicate based on the "uniqueID" attribute, but you're identifying duplicates based on JSON fields in the message payload. Is it possible that you have multiple messages arriving over Pubsub with the same label1 and label2 in the payload, but a different "uniqueID" attribute? You can debug that by adding a ParDo right after PubsubReadSubscription to log the complete message contents, and see if any particular uniqueID appears twice. – jkff Dec 19 '17 at 21:51
  • I can't do it on `readStrings` since that gives me strings and not `PubsubMessage`s. I've simplified my code to just `readMessages()` and then a `.apply(MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via(msg -> System.out.println(msg.getAttribute("uniqueID")); return msg;}));`. This results in `null`s being printed suggesting `uniqueID` is not set. Even though I'm seeing it when I pull the subscription using the `gcloud` tool (as shown above). – Devon Peticolas Dec 19 '17 at 22:27
  • I've additionally tried printing `msg.getAttributeMap` and `new String(msg.getPayload(), StandardCharsets.UTF_8)` and confirmed that all set attributes are missing (I'm also expecting `eventTime`) and that the payload is correct. And just to clarify, the way I'm now establishing the PubsubIO.Read is... `PubsubIO.readMessages().withTimestampAttribute("eventTime").withIdAttribute("uniqueID").fromSubscription(options.getPubsubSubscription())` – Devon Peticolas Dec 19 '17 at 22:39
  • You need to use readMessagesWithAttributes(). readMessages() by default does not include attributes. – jkff Dec 19 '17 at 23:12
  • Thanks @jkff. I reran the experiment with `PubsubIO.readMessagesWithAttributes().withTimestampAttribute("eventTime").‌​withIdAttribute("uni‌​queID").fromSubscrip‌​tion(options.getPubs‌​ubSubscription()) ` logging all `uniqueIDs` and piped them into a file. Ran `sort myuniqueids | uniq -c | sort -r | head` and was able to identify two uniqueid values that appeared twice after a few minutes. – Devon Peticolas Dec 20 '17 at 00:05
  • Thanks. What runner are you using? How far apart in time are the duplicated messages? – jkff Dec 20 '17 at 00:14
  • I know its been a couple months, but I just wanted to follow up with some final details for you or anyone else who might find this. This was using the GCP DataFlow running. When running in GCP (as opposed to the local runner) duplicate messages were fairly sporadic, there were decent bouts of time between them, I think the `uniqueId` attribute dealt with most but not all of them. At the end of the day, I ended up just mitigating the issue by just throwing out the duplicates I came across in my `DoFn`. Thank you for the help @jkff. – Devon Peticolas Apr 13 '18 at 15:23
  • I experience the same (using Beam Python SDK though, v2.11)... sporadically message with duplicate `message_id` attribute DO get read into pipeline... esp. when stop and then restart publisher application. Let me know if you'll need more details, I can upload whole code (simple test MVP code) on GitHub... – Vibhor Jain Mar 16 '19 at 20:24
  • here's my SO post, related to same issue, with further details https://stackoverflow.com/questions/55194499/beam-dataflow-readfrompubsubid-label-unexpected-behavior – Vibhor Jain Mar 16 '19 at 21:08