0

I am quite new to working with Dataflow (GCP). I built a pipeline that runs in DirectRunner mode faster than DataflowRunner mode, I don't know how it can be improved. The pipeline reads data from multiple tables in Bigquery and returns a csv file, and it receive dates as execution parameters to filter the query:

def get_pipeline_options(pipeline_args):
    pipeline_args = ['--%s=%s' % (k, v) for (k, v) in {
        'project': PROJECT_ID,
        'region': REGION,
        'job_name': JOB_NAME,
        'staging_location': STORAGE + 'STAGING_DIRECTORY',
        'temp_location': STORAGE + 'TEMP_DIRECTORY',
    }.items()] + pipeline_args
    options = PipelineOptions(pipeline_args)
    return options

class Reader(beam.DoFn):
  import datetime
  def __init__(self, fechaIni, fechaFin):
    self.fechaIni = fechaIni
    self.fechaFin = fechaFin
  
  def process(self,text):
    from google.cloud import bigquery
    from datetime import datetime
    dateIni = self.fechaIni.get()
    dateEnd = self.fechaFin.get()
      
    query = """  
        #A huge query from multiple tables with joins
     """
    client = bigquery.Client()
    query_job = client.query(query)
    result_fields = query_job.result()
    
    return result_fields       
  


class CampaignOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--fechaIni', type=str)
      parser.add_value_provider_argument('--fechaFin', type=str)


def run(argv=None, save_main_session=True):
    """The main function which creates the pipeline and runs it."""
    parser = argparse.ArgumentParser()
 
    parser.add_argument(
      '--output',
      dest='output',
      default='gs://mybucket/input_'+datetime.datetime.now().strftime('%Y%m%d'),
      help='Output files.')


    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
        '--project=myproject',
        '--staging_location=gs://mybucket',
        '--region=us-central1',
        '--temp_location=gs://gs://mybucket',
        '--job_name=myjob'
    ])
    pipeline_options = PipelineOptions(pipeline_args)

    campaign_options = pipeline_options.view_as(CampaignOptions)

    with beam.Pipeline(options=campaign_options) as pipeline:
        r = (
          pipeline
          | 'Initialize'>> beam.Create([':-)' ])
          | 'Read from BigQuery' >> beam.ParDo(Reader(campaign_options.fechaIni,campaign_options.fechaFin))
          | 'Read values' >> beam.Map(lambda x: x.values())
          | 'CSV format' >> beam.Map(lambda row: ','.join([str(column) for column in row]))
          | 'Write' >>beam.io.WriteToText(num_shards=1,  file_path_prefix = known_args.output )
        )
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
  • Are you asking about parallelisation, async call options Eglis? Not clear what is the existing performance metrics are and how much you want to improve? – demokritos Jan 05 '21 at 15:07
  • Running in DataflowRunner mode the difference is huge, the difference is about two hours. The data to query is very large from multiple tables with inner joins and the generated file is approximately 3GB, but I don't know why the time difference is so large between DataflowRunner mode and DirectRunner mode. What can I do to improve the execution time? Thank you – Eglis Alvarez López Jan 05 '21 at 15:27
  • Where run your direct runner? On your computer? If so, what are its specs? – guillaume blaquiere Jan 05 '21 at 16:47
  • Yes, on my own computer. Specs: Core i5-10400 CPU @ 2.90GHz 2.90 GHz RAM: 32 GB – Eglis Alvarez López Jan 05 '21 at 19:53
  • Within the code you shared, I noticed that there isn’t the runner parameter. Do you pass the runner option within the command-line?, Please notice that for cloud execution, this must be ‘DataflowRunner’ as is mentioned [here](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#configuring-pipelineoptions-for-execution-on-the-cloud-dataflow-service). – Mariana Angeles Jan 06 '21 at 18:30
  • 1
    Additionally, if you don't specify some aspects like [num_workers](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options) or [machine_type](https://cloud.google.com/compute/docs/machine-types), the Dataflow service will determine its value based on your job. In this case, I recommend to check [job metrics](https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf#accessing_job_monitoring_charts) in order to see how many vCPUs and memory is being used and if you need more try changing the machine type. – Mariana Angeles Jan 06 '21 at 18:32

0 Answers0