1

I am developing an ETL pipeline for Google Cloud Dataflow where I have several branching ParDo transforms which each require a local audio file. The branched results are then combined and exported as text.

This was initially a Python script that ran on a single machine that I am attempting to adapt for VM worker parallelisation using GC Dataflow.

The extraction process downloads the files from a single GCS bucket location then deletes them after the transform is completed to keep storage under control. This is due to the pre-processing module which requires local access to the files. This could be re-engineered to handle a byte stream instead of a file by rewriting some of the pre-processing libraries myself - however, some attempts at this aren't going well and I'd like to explore first how to handle parallelised local file operations in Apache Beam / GC Dataflow in order to understand the framework better.

In this rough implementation each branch downloads and deletes the files, with lots of double handling. In my implementation I have 8 branches, so each file is being downloaded and deleted 8 times. Could a GCS bucket instead be mounted on every worker rather than downloading files from the remote?

Or is there another way to ensure workers are being passed the correct reference to a file so that:

  • a single DownloadFilesDoFn() can download a batch
  • then fan out the local file references in PCollection to all the branches
  • and then a final CleanUpFilesDoFn() can remove them
  • How can you parallelise local file references?

What is the best branched ParDo strategy for Apache Beam / GC Dataflow if local file operations cannot be avoided?


Some example code of my existing implementation with two branches for simplicity.

# singleton decorator
def singleton(cls):
  instances = {}
  def getinstance():
      if cls not in instances:
          instances[cls] = cls()
      return instances[cls]
  return getinstance

@singleton
class Predict():
  def __init__(self, model):
    '''
    Process audio, reads in filename 
    Returns Prediction
    '''
    self.model = model

  def process(self, filename):
      #simplified pseudocode
      audio = preprocess.load(filename=filename)
      prediction = inference(self.model, audio)
      return prediction

class PredictDoFn(beam.DoFn):
  def __init__(self, model):
    self.localfile, self.model = "", model
    
  def process(self, element):
    # Construct Predict() object singleton per worker
    predict = Predict(self.model)

    subprocess.run(['gsutil','cp',element['GCSPath'],'./'], cwd=cwd, shell=False)
    self.localfile = cwd + "/" + element['GCSPath'].split('/')[-1]

    res = predict.process(self.localfile)
    return [{
        'Index': element['Index'], 
        'Title': element['Title'],
        'File' : element['GCSPath'],
        self.model + 'Prediction': res
        }]    
  def finish_bundle(self):
    subprocess.run(['rm',self.localfile], cwd=cwd, shell=False)


# DoFn to split csv into elements (GSC bucket could be read as a PCollection instead maybe)
class Split(beam.DoFn):
    def process(self, element):
        Index,Title,GCSPath = element.split(",")
        GCSPath = 'gs://mybucket/'+ GCSPath
        return [{
            'Index': int(Index),
            'Title': Title,
            'GCSPath': GCSPath
        }]

A simplified version of the pipeline:

with beam.Pipeline(argv=pipeline_args) as p:
    files = 
        ( 
        p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
          | 'Parse CSV into Dict' >> beam.ParDo(Split())
        )
    # prediction 1 branch
    preds1 = 
        (
          files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
        )
    # prediction 2 branch
    preds2 = 
        (
          files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
        )
    
    # join branches
    joined = { preds1, preds2 }

    # output to file
    output = 
        ( 
      joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
        )

lys
  • 949
  • 2
  • 9
  • 33
  • This looks professional to me. I think that this might already be the optimal solution. I have been using something similar but not quite as clean for years and the GCS access step is not a significant delay or source of problems. – Steven Ensslen Nov 29 '20 at 21:59
  • 1
    When I access files in GCS from Dataflow I use [`google.cloud.storage.Client(...).download_blob_to_file()`](https://googleapis.dev/python/storage/latest/client.html) which I find much cleaner than a `subprocess` call to `gsutil` – Steven Ensslen Nov 29 '20 at 21:59
  • thanks Steven - I'll use this instead of `gsutil` - can I ask how you handle file operations when pipes branch? I actually have 7+ branches in the pipeline for several terabytes of data. Currently each branch is downloading and deleting the same file – lys Nov 30 '20 at 00:47

1 Answers1

2

In order to avoid downloading the files repeatedly, the contents of the files can be put into the pCollection.

class DownloadFilesDoFn(beam.DoFn):
  def __init__(self):
     import re
     self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')

  def start_bundle(self):
     import google.cloud.storage
     self.gcs = google.cloud.storage.Client()

  def process(self, element):
     file_match = self.gcs_path_regex.match(element['GCSPath'])
     bucket = self.gcs.get_bucket(file_match.group(1))
     blob = bucket.get_blob(file_match.group(2))
     element['file_contents'] = blob.download_as_bytes()
     yield element
     

Then PredictDoFn becomes:

class PredictDoFn(beam.DoFn):
  def __init__(self, model):
    self.model = model

  def start_bundle(self):
    self.predict = Predict(self.model)
    
  def process(self, element):
    res = self.predict.process(element['file_contents'])
    return [{
        'Index': element['Index'], 
        'Title': element['Title'],
        'File' : element['GCSPath'],
        self.model + 'Prediction': res
        }]   

and the pipeline:

with beam.Pipeline(argv=pipeline_args) as p:
    files = 
        ( 
        p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
          | 'Parse CSV into Dict' >> beam.ParDo(Split())
          | 'Read files' >> beam.ParDo(DownloadFilesDoFn())
        )
    # prediction 1 branch
    preds1 = 
        (
          files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
        )
    # prediction 2 branch
    preds2 = 
        (
          files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
        )
    
    # join branches
    joined = { preds1, preds2 }

    # output to file
    output = 
        ( 
      joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
        )
lys
  • 949
  • 2
  • 9
  • 33
Steven Ensslen
  • 1,164
  • 9
  • 21
  • Wonderful Steven, thank you! This is also helping me grok the use of blobs in Dataflow too. Let me give this a shot then I'll mark it as accepted – lys Nov 30 '20 at 01:46
  • 1
    Hi Steven, what are you thoughts on keeping the GCS client initialization in Setup instead of start_bundle? – Sach Nov 30 '20 at 04:15
  • 1
    I think there's a syntax error in the re calls - I'm getting `TypeError: 're.Pattern' object is not callable` – lys Nov 30 '20 at 05:10
  • 1
    @Sach I think that you are correct: that ought to be in Setup per https://stackoverflow.com/a/50068377/7359502 I don't use Setup in my work, why I do not use setup I can not recall. – Steven Ensslen Nov 30 '20 at 19:55
  • I've been using a global client instance for the whole pipeline, but i'm guessing instantiating the client in setup for each `DoFn` is best/better practice? – lys Nov 30 '20 at 21:11
  • @lys I tried using a global client instance for a whole pipeline, but that did not work once I deployed as a template. It could be that I made another error, but making the client in each `DoFn` is what I did to make it work. – Steven Ensslen Nov 30 '20 at 21:44
  • I wonder then, should the instance of the client be deleted from memory in `finished_bundle()` then? How about the references to the bytes also? My pipeline is now leaking memory very badly. – lys Dec 01 '20 at 23:47