3

I'm trying with Apache Beam 2.1.0 to consume simple data (key,value) from google PubSub and group by key to be able to treat batches of data.

With default trigger my code after "GroupByKey" never fires (I waited 30min). If I defined custom trigger, code is executed but I would like to understand why default trigger is never fired. I tried to define my own timestamp with "withTimestampLabel" but same issue. I tried to change duration of windows but same issue too (1second, 10seconds, 30seconds etc).

I used command line for this test to insert data

gcloud beta pubsub topics publish test A,1
gcloud beta pubsub topics publish test A,2
gcloud beta pubsub topics publish test B,1
gcloud beta pubsub topics publish test B,2

From documentation it says that we can do one or the other but not necessarily both

If you are using unbounded PCollections, you must use either non-global windowing OR an aggregation trigger in order to perform a GroupByKey or CoGroupByKey

It looks to be similar to

  1. Consuming unbounded data in windows with default trigger
  2. Scio: groupByKey doesn't work when using Pub/Sub as collection source

My code

static class Compute extends DoFn<KV<String, Iterable<Integer>>, Void> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        // Code never fires
        System.out.println("KEY:" + c.element().getKey());
        System.out.println("NB:" + c.element().getValue().spliterator().getExactSizeIfKnown());
    }
}

public static void main(String[] args) {
    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());

    p.apply(PubsubIO.readStrings().fromSubscription("projects/" + args[0] + "/subscriptions/test"))
     .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
     .apply(
        MapElements
            .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
            .via((String row) -> {
                String[] parts = row.split(",");
                System.out.println(Arrays.toString(parts)); // Code fires
                return KV.of(parts[0], Integer.parseInt(parts[1]));
            })
     )
    .apply(GroupByKey.create())
    .apply(ParDo.of(new Compute()));

    p.run();
}
harscoet
  • 340
  • 1
  • 11
  • How are you determining that it never fires? What runner are you executing with? – Ben Chambers Nov 08 '17 at 18:52
  • My logs never fire after waiting 8 hours. I tried with DirectRunner and DataflowRunner (job ID: 2017-11-08_03_07_26-15172621167075751505). In graph on dataflow UI I can see my job running since 8 hours, in "GroupByKey" info step: Input collections -> Elements added -> 14, Output collections -> Elements added -> - – harscoet Nov 08 '17 at 19:03
  • That is odd. Are you sure the subscription exists and is properly configured? Do you have sufficient pubsub quota for pulling the messages in? Is the MapElements function throwing exception (eg., if the integer isn't properly formatted)? You could try catching NumberFormatExceptions or whatnot. You might also try not using the spliterator methods in `Compute` (shouldn't cause the problem, but might). You might also consider using an SLF4J logger as described [here](https://cloud.google.com/dataflow/pipelines/logging). System.out may not be getting flushed. – Ben Chambers Nov 08 '17 at 22:01
  • 1
    Any luck figuring this out? – Ben Chambers Nov 29 '17 at 23:24
  • I have the same issue with 2.3.0, groupbykey hangs in localrunner, works fine in dataflow. did you solve it? – Gal Ben-Haim Mar 21 '18 at 22:45
  • As you are trying to use a trigger using window duration, I would make sure each element in the collection has a proper timestamp. The grouping and window triggers won’t work if the timestamps are not suited with the current window processing. To check it out you can execute locally and debug reading the real pubsub messages, or log to console the relevant info, maybe you can separate the steps such as: 1: read from pubsub. 2: processElement( ProcessContext c, BoundedWindow w) and print window info. If you the time stamps are not ok, assign a timestamp by your own and check again. – aleph Sep 11 '18 at 17:31

0 Answers0