0

Using Python 3.9 and Apache Beam 2.38.0, the minimal working example below works fine.

However, when I use Apache Beam 2.39.0 (or 2.44.0), the example fails with the error AssertionError: A total of 2 watermark-pending bundles did not execute.. When I switch the logging to DEBUG, I see messages of the form Unable to add bundle for stage along with the Stage input watermark: Timestamp(-9223372036854.775000) (i.e. timestamp.MIN_TIMESTAMP) and Bundle schedule watermark: Timestamp(9223372036854.775000) (i.e. timestamp.MAX_TIMESTAMP) for the two bundles.

import logging
import apache_beam as beam


def setup_logging():
    log_format = '[%(asctime)-15s] [%(name)s] [%(levelname)s]: %(message)s'
    logging.basicConfig(format=log_format, level=logging.INFO)
    logging.info("Pipeline Started")


class CreateKvPCollectWithSideInputDoFn(beam.DoFn):
    def __init__(self):
        super().__init__()

    def process(self, element, side_input):
        print(f"side_input_type: {type(side_input)}")
        yield "b", "2"


class CreateKvPCollectDoFn(beam.DoFn):
    def __init__(self):
        super().__init__()

    def process(self, element):
        yield "a", "1"


def main():
    setup_logging()

    pipeline = beam.Pipeline()

    pcollect_input = (
        pipeline
        | "Input/Create" >> beam.Create(["input"])
    )

    kvpcollect_1 = (
        pcollect_input | "PCollection_1" >> beam.ParDo(CreateKvPCollectDoFn())
    )
    beamdict_1 = beam.pvalue.AsDict(kvpcollect_1)

    kvpcollect_2 = (
        pcollect_input
        | "PCollection_2" >> beam.ParDo(
            CreateKvPCollectWithSideInputDoFn(), side_input=beamdict_1
        )
    )

    kvpcollect_3 = (
        (kvpcollect_1, kvpcollect_2)
        | "Flatten" >> beam.Flatten()
    )
    beamdict_3 = beam.pvalue.AsDict(kvpcollect_3)

    (
        pcollect_input
        | "UseBeamDict_3" >> beam.ParDo(CreateKvPCollectWithSideInputDoFn(), side_input=beamdict_3)
        | "PrintResult" >> beam.Map(print)
    )

    result = pipeline.run()
    result.wait_until_finish()


if __name__ == '__main__':
    main()

I would like to know why this error appears to be triggered on Apache Beam Python versions greater than 2.38.0 and if there is some way of avoiding it.

Francis
  • 529
  • 4
  • 15
  • I have found that I can make the example pass if I add `| "Reshuffle_2" >> beam.Reshuffle()` after the `"PCollection_2"` step. However, this does not really help since in the real pipeline I am looking at, that works with Apache Beam 2.38.0, these Reshuffle steps would need to be added in lots of places just to allow the pipeline to pass using DirectRunner on later Apache Beam versions. Would still be good to understand the change in behaviour. – Francis Apr 08 '23 at 21:03
  • Opened an issue on this at https://github.com/apache/beam/issues/26190 to check if this change in behaviour is expected. – Francis Apr 10 '23 at 11:14

0 Answers0