4

I'm running an Apache Beam pipeline reading text files from Google Cloud Storage, performing some parsing on those files and the writing the parsed data to Bigquery.

Ignoring the parsing and google_cloud_options here for the sake of keeping it short, my code is as follows: (apache-beam 2.5.0 with GCP add-ons and Dataflow as runner)

p = Pipeline(options=options)

lines = p | 'read from file' >> 
beam.io.ReadFromText('some_gcs_bucket_path*')  |  \
    'parse xml to dict' >> beam.ParDo(
        beam.io.WriteToBigQuery(
            'my_table',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    p.run()

This runs fine and successfully appends the relevant data to my Bigquery table for a small number of input files. However when I ramp up my number of input files to +- 800k, I get an error:

"Total size of the BoundedSource objects returned by BoundedSource.split() operation is larger than the allowable limit."

I found Troubleshooting apache beam pipeline import errors [BoundedSource objects is larger than the allowable limit] which recommends using ReadAllFromText in stead of ReadFromText.
However when I swap out I get the following error:

Traceback (most recent call last):
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 240, in <module>
    xmltobigquery.run_dataflow()
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 220, in run_dataflow
    'parse xml to dict' >> beam.ParDo(XmlToDictFn(), job_spec=self.job_spec) | \
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 831, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 488, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/textio.py", line 470, in expand
    return pvalue | 'ReadAllFiles' >> self._read_all_files
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 416, in expand
    | 'ReadRange' >> ParDo(_ReadRange(self._source_from_file)))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 568, in expand
    | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 494, in expand
    windowing_saved = pcoll.windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'. 

Any suggestions?

A.Queue
  • 1,549
  • 6
  • 21
  • Please also share the final version of your code that gives this error. This way we will have the actual context. – A.Queue Sep 24 '18 at 15:01
  • Additionally I have found [this](https://stackoverflow.com/questions/46564730/google-dataflow-read-from-spanner) answer mentioning the same error and that adding `PColl` fixed it. – A.Queue Sep 24 '18 at 15:05
  • Turns out using "ReadAllFromText" one should also add a "Create" method, (not automatically included as with "ReadFromText" example). This sorted out my problem, thanks. – Richardt Benade REZCO Sep 25 '18 at 15:31
  • @richardt-benade-rezco, cool! Fell free to share what your code looks like in the end. – A.Queue Sep 25 '18 at 15:33
  • @Richardt Benade REZCO could you please post the solution and the working code for the benefit of the community as an asnwer? – Philipp Sh Oct 08 '18 at 08:14

2 Answers2

6

I was facing the same issue. As Richardt mentioned beam.Create has to be called explicitly. An additional challenge is how to use this pattern together with template parameters, because beam.Create only support in-memory data as described in the documentation.

Google Cloud Support helped me in this case and I want to share the solution with you. The trick is to create the pipeline with a dummy string and then use a mapping lambda to read the input at runtime:

class AggregateOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            help='Path of the files to read from')
        parser.add_value_provider_argument(
            '--output',
            help='Output files to write results to')

def run():
    logging.info('Starting main function')

    pipeline_options = PipelineOptions()
    pipeline = beam.Pipeline(options=pipeline_options)
    options = pipeline_options.view_as(AggregateOptions)

    steps = (
            pipeline
            | 'Create' >> beam.Create(['Start'])  # workaround to kickstart the pipeline
            | 'Read Input Parameter' >> beam.Map(lambda x: options.input.get())  # get the real input param
            | 'Read Data' >> beam.io.ReadAllFromText()
            | # ... other steps

Hope this answer is helpful.

philsch
  • 1,004
  • 11
  • 19
  • And where do you specify the input/output files? As a parser argument when you run the pipeline script? – Luiscri Jun 24 '21 at 11:50
  • 1
    @Luiscri For testing purpose, you can add the arguments when running the code locally. However, for running in GCloud DataFlow generate a [custom template](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates) by creating a metadata file and running the described python command. Upload the metadata file with a postfix `_metadata` to the same Cloud Storage location where the template has been saved. The template is then [selectable in Dataflow](https://cloud.google.com/dataflow/docs/guides/templates/running-templates). – philsch Jun 26 '21 at 08:04
0

To answer the original question: ReadFromText takes a file pattern argument while ReadAllFromText takes its file patterns as a pipeline input:

# ReadFromText
(p
 | beam.io.ReadFromText("myfile.csv"))

# ReadAllFromText
(p
 | beam.Create(["myfile1.csv", "myfile2.csv", "myfile3.csv"])
 | beam.io.ReadAllFromText())
bfontaine
  • 18,169
  • 13
  • 73
  • 107