0

I have a branching pipeline with multiple ParDo transforms that are merged and written to text file records in a GCS bucket.

I am receiving the following messages after my pipeline crashes:

  • The worker lost contact with the service.
  • RuntimeError: FileNotFoundError: [Errno 2] Not found: gs://MYBUCKET/JOBNAME.00000-of-00001.avro [while running 'WriteToText/WriteToText/Write/WriteImpl/WriteBundles/WriteBundles']

Which looks like it can't find the log file it's been writing to. It seems to be fine until a certain point when the error occurs. I'd like to wrap a try: / except: around it or a breakpoint, but I'm not even sure how to discover what the root cause is.

Is there a way to just write a single file? Or only open a file to write once? It's spamming thousands of output files into this bucket, which is something I'd like to eliminate and may be a factor.

with beam.Pipeline(argv=pipeline_args) as p:
     csvlines = (
            p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
              | 'Parse CSV to Dictionary' >> beam.ParDo(Split())
              | 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
              | 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
            )

     b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
     b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
     b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
     b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
     b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
     b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
        
     output = (
           (b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
           | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
           )
lys
  • 949
  • 2
  • 9
  • 33
  • 1
    Regarding your error, could you share the whole code so I can inspect it, I need to know what is happening with the functions you wrote. Lastly, Apache Beam writes, by default, the output in multiple files following the desired prefix. However, if you want just one file as output you can set it with `num_shards` within the `WriteToText` , method, [here](https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText) is the documentation for it. – Alexandre Moraes Dec 01 '20 at 10:45
  • thanks @AlexandreMoraes - the code is show in this question here: https://stackoverflow.com/questions/65057273/how-to-handle-operations-on-local-files-over-multiple-pardo-transforms-in-apache – lys Dec 01 '20 at 10:49

1 Answers1

1

This question is linked to this previous question which contains more detail about the implementation. The solution there suggested to create an instance of google.cloud.storage.Client() in the start_bundle() of every call to a ParDo(DoFn). This is connected to the same gcs bucket - given via the args in WriteToText(known_args.output)

class DownloadFilesDoFn(beam.DoFn):
    def __init__(self):
        import re
        self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')

    def start_bundle(self):
        import google.cloud.storage
        self.gcs = google.cloud.storage.Client()

    def process(self, element):
        self.file_match = self.gcs_path_regex.match(element['Url'])
        self.bucket = self.gcs.get_bucket(self.file_match.group(1))
        self.blob = self.bucket.get_blob(self.file_match.group(2))
        self.f = self.blob.download_as_bytes()

It's likely the cause of this error is related to to having too many connections to the client. I'm not clear on good practice for this - since it's been suggested elsewhere that you can set up network connections in this way for each bundle.

Adding this to the end to remove the client object from memory at the end of the bundle should help close some unnecessary lingering connections.

    def finish_bundle(self):
        del self.gcs, self.gcs_path_regex
lys
  • 949
  • 2
  • 9
  • 33