2

I'm trying to join 2 unbounded sources using Apache Beam Java SDK. While Joining Im getting the below error message.

Exception in thread "main" java.lang.UnsupportedOperationException: Joining unbounded PCollections is currently only supported for non-global windows with triggers that are known to produce output once per window,such as the default trigger with zero allowed lateness. In these cases Beam can guarantee it joins all input elements once per window. WindowingStrategy{windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowedLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute)), accumulationMode=DISCARDING_FIRED_PANES, timestampCombiner=EARLIEST} is not supported at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.access$1500(BeamJoinRel.java:98) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:330) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:308) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:67) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:48) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Iterator.forEachRemaining(Iterator.java:116) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:49) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:65) at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:100) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:167) at xyz.xyz.main(xyz.java:64)

I have tried using both Fixed & Sliding Window along with triggering (pastEndOfWindow & pastFirstElementInPane) with zero allowed lateness. Tried both Accumalate & Discard fired panes. I get the same error message everytime.

Below are 2 snippets i tried using both fixed & sliding window.

p1.apply("window",
    Window
      .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .withAllowedLateness(Duration.ZERO)
      .accumulatingFiredPanes());
p1.apply("window2",
    Window.<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(5)))
      .triggering(
        Repeatedly
          .forever(
             AfterProcessingTime
               .pastFirstElementInPane()
               .plusDelayOf(Duration.standardMinutes(1))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

I simply wanted to implement a sql transform with a sliding window, Trigger with delay and allow lateness. Kindly guide me through to implement it.

Thanks, Gowtham

BionicCode
  • 1
  • 4
  • 28
  • 44
Gowtham
  • 87
  • 1
  • 14
  • Here is the answer to the similar question: https://stackoverflow.com/a/61525992/2849811 This will be helpful. – Onkar Apr 30 '20 at 14:55

2 Answers2

3

Until now (2.13.0), BeamSQL does not support unbounded join unbounded PCollections with non-default triggers. Only default trigger is allowed for such joins(so there will be only one result emitted per window).

The main reason is, in current Beam Java SDK implementation, there is a missing mechanism(which is called retracting and accumulating) to refine data in cases like Join.

Rui Wang
  • 789
  • 6
  • 11
  • 1
    This is correct. To go slightly deeper, the underlying reason is that JOIN is essentially a grouping operation, which means that it has to collect the input data and then process it (e.g. match the rows from the left side of the join to the rows on the right side of the joins). In case of unbounded (streaming) sources one way to do it is to wait until windows contents of both inputs are emitted and then join them per-window. This is straightforward if you have a non-global window and a single trigger firing, which means that your pipeline only emits each window once and drops late data. – Anton Jul 08 '19 at 18:04
  • 1
    Now if you want to handle late data and multiple trigger firings, the question is - if you emit it in accumulating mode, how do you reprocess the newly emitted window contents? You have already emitted some join results when trigger fired last time, do you want to emit the same output plus new data once again? And in discarding mode it's even worse - you don't have all of the data in one of the inputs, so inner joins will not produce correct results out of the box. – Anton Jul 08 '19 at 18:07
  • 1
    In a language like Java you can handle it by manually tracking the data in a state cell and handling the join logic according to your business needs (e.g. take a look at this example: https://github.com/apache/beam/blob/b8aa8486f336df6fc9cf581f29040194edad3b87/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java#L155 ). But SQL lacks this kind of capability, you cannot describe how to handle late data in pure SQL. – Anton Jul 08 '19 at 18:09
  • 1
    As Rui has mentioned, retractions would be one way to implement correct join semantics in SQL in general case. This mechanism basically allows a `PTransform` to retract the previous output, and then emit the updated result. So in case of join, the pipeline would say - undo the whole output of the previous join, and re-emit the new one with extra data that just came in. This has to be implemented in the framework and doesn't exist currently in Beam (and Beam SQL specifically), so you are out of luck here, unfortunately. – Anton Jul 08 '19 at 18:13
  • 1
    Here's a link that explains a bit about Beam SQL JOIN: https://beam.apache.org/documentation/dsls/sql/extensions/joins/ , and more detailed info about `GroupByKey`/`CoGroupByKey`, which is how Beam SQL joins are implemented: https://beam.apache.org/documentation/programming-guide/#groupbykey – Anton Jul 08 '19 at 18:15
  • Thanks for your answers Rui & Anton. Specifically, the question from @Anton about accumalating & discarding mode made me get a deeper understanding. As for my use case, i want to join a couple of streams and I expect some late data.If i use the sliding window with duration as 30 minutes and period as 30 seconds, will there be a trigger every 30 seconds ? Technically, its still one trigger per window, as every 30 seconds a new window gets created. You Guys are awesome. Thanks. – Gowtham Jul 09 '19 at 04:17
  • 1
    If you use a sliding window and a default trigger, with zero allowed lateness, it will accumulate the events for each 30 minutes in a sliding manner and emit them once per window, it will work. It might not necessarily emit them every 30 real seconds (windows are defined in terms of event time, so for example there's a watermark that is controlled by the source and can cause delays), but each window will contain only elements that have timestamps that belong to the same 30 minute interval, and every window will be 30 seconds apart from each other in terms of timestamps of elements in windows. – Anton Jul 09 '19 at 15:39
  • 1
    When they will actually get emitted will be decided by the runner, trigger only allows it to emit data, but doesn't force it to do it at any specific point in time. And default trigger will allow the data to be emitted after the watermark has passed the end of window, meaning that the source thinks that it has seen all of the events that have timestamps that belong to the specific window, and no other elements that have timestamps within the window will ever appear. If they do, they will be dropped. – Anton Jul 09 '19 at 15:41
  • @Anton Is there a way to view the value of watermark? Basically, i have 2 PCollection of string (CSV) and im trying to join them in near realtime (i.e.) in less than 5 secs. And if there can be a delay in arrival of a record, a max of 30 mins can be allowed. So, I want data to be emitted every 30 secs and if one topic receives a record early, it has to wait until the corresponding record comes upto a max of 30 mins and if it doesn't come even then, it should be dropped. How to implement this in Beam? Your Help is Highly Appreciated. Please post this as a answer, as this is what i really want. – Gowtham Jul 10 '19 at 04:22
1

From the comment, if I understand it correctly, the desired behavior is:

  • join two streams;
  • emit results every 30 seconds in real world time;
  • if the data cannot be matched, wait for the corresponding matching record for 30 min max;
  • drop the records after 30 min;

Basically it's kind of continuous sliding matching of the last 30 min of data in both streams, and results are emitted every 30 seconds.

Good news is that it should be possible to implement in Beam Java (probably in Python as well). Bad news it would probably be non-trivial in Java and I don't think it's possible at all in SQL at the moment.

What it would probably look like:

  • input should be in global window;
  • have a stateful ParDo (or this) which keeps track of all seen elements by storing them in a state cell:
    • you will probably need to use either a side-input or apply a CoGroupByKey beforehand to have access to elements from both inputs in the same ParDo;
    • side-inputs and CoGroupByKey have different semantics and might not be easy to work with;
  • on each input manually check the state for the matching records;
  • either emit results right away or keep them in another state cell;
  • have a timer that would purge old unmatched records:
    • you might need to manually keep track of timestamps and other things;
  • apply desired window/trigger to the output if needed;

I suggest you read through this example, it does the timer and state part of what you need (it waits for matching records, keeps the unmatched records in the state, and clears state on timer firing) and uses a CoGroupByKey. You might have a better idea of how it works after you understand this example.

Anton
  • 2,431
  • 10
  • 20