5

I'm trying to read from pub/sub with the following code

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
    @Override
    public String apply(PubsubMessage input) {
        LOG.info("hola " + input.getAttributeMap());
        return new String(input.getMessage());
    }
});
PCollection<String> pps = p.apply(pubsub)
        .apply(
                Window.<String>into(
                    FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        LOG.info("hola amigo "+c.element());
        c.output(c.element());
    }
  }));

Compared to what I receive on NodeJS, I get the message that would be contained in the data field. How can I get the ackId field (which I can later use to acknowledge the message)? The attribute map that I'm printing is null. Is there some other way to acknowledge all messages without having to figure out the ackId?

Sam McVeety
  • 3,194
  • 1
  • 15
  • 38
njLT
  • 464
  • 6
  • 21
  • I'm using v0.6.0 – njLT May 16 '17 at 14:01
  • 1
    Which runner are you using? I believe that the `PubsubIO.read()` should be acknowledging messages for you after they are successfully processed -- are you sure it is necessary for you to acknowledge them yourself? – Ben Chambers May 16 '17 at 17:38
  • I'm using flink-runner. It didn't seem like the messages were getting acknowledged, but I'll check again. – njLT May 17 '17 at 05:49
  • I checked again, the messages are definitely not getting acked. But I was wrong to assume that the ackId would be in the attributes - the attribute map value is correct. So I just need to know how to get my messaged acknowledged. – njLT May 17 '17 at 06:17
  • The `PubsubIO` reader is responsible for acknowledging messages. I believe it is tied to the checkpointing behavior of the runner. Specifically, the source will only acknowledge when the elements read out have been checkpointed. How have you configured the checkpointing behavior of the flink-runner? – Ben Chambers May 17 '17 at 16:21
  • Thanks, I didn't know that I would have to specify a checkpoint interval. My args now look like `--runner=FlinkRunner --checkpointingInterval=15000 --streaming=true`, but I'm getting `Checkpoint triggering task Source: ... is not being executed at the moment. Aborting checkpoint.`. Is it related to https://issues.apache.org/jira/browse/FLINK-2491 or am I still missing something? I did try different checkpointing intervals. – njLT May 18 '17 at 07:58
  • @BenChambers Thanks. The checkpointing option worked for me, in a similar issue. The messages start getting ack'ed after I set it. I had logged an issue in Beam's JIRA, will update that with this info so that the relevant Beam docs can be updated. – talonx Jun 08 '18 at 09:20

1 Answers1

7

The PubsubIO reader is responsible for acknowledging messages. It is tied to the checkpointing behavior of the runner. Specifically, the source will only acknowledge messages when the resulting elements have been checkpointed.

In this case you should look at when the Flink runner checkpoints information the state of that source. I believe this is related to the Flink configuration for checkpoint frequency.

Ben Chambers
  • 6,070
  • 11
  • 16