1

Suppose I have a python dataflow job in GCP that does the following 2 things:

  • Fetches some data from BigQuery

  • Calls an external API in order to get a certain value and filters the data from BigQuery based on the fetched value

I am able to do this, however for the second step the only way I figured out how to implement it was to have it as a class that extends DoFn and call it in a parallel way later:

class CallExternalServiceAndFilter(beam.DoFn):
    def to_runner_api_parameter(self, unused_context):
        pass

    def process(self, element, **kwargs):
        # here I have to make the http call and figure out whether to yield the element or not,
        # however this happens for each element of the set, as expected.
        if element['property'] < response_body_parsed['some_other_property']:
            logging.info("Yielding element")
            yield element
        else:
            logging.info("Not yielding element")
with beam.Pipeline(options=PipelineOptions(), argv=argv) as p:
    rows = p | 'Read data' >> beam.io.Read(beam.io.BigQuerySource(
        dataset='test',
        project=PROJECT,
        query='Select * from test.table'
    ))

    rows = rows | 'Calling external service and filtering items' >> beam.ParDo(CallExternalServiceAndFilter())

    # ...

Is there any way that I can make the API call only once and then use the result in the parallel filtering step?

Vee6
  • 1,527
  • 3
  • 21
  • 40
  • Does the external call depends on the element itself? That means when making the http call to get response_body_parsed, do you use any property from the element it self? – Ankur Aug 20 '19 at 18:29
  • in the specific case I was on there was no dependency on the individual element itself. let's consider it a completely separate step that doesn't need to be run in parallel. – Vee6 Aug 21 '19 at 14:48
  • In that case you can use side input to make the http call once and use the result in the pardo without making the call multiple times. Reference: https://beam.apache.org/documentation/programming-guide/#side-inputs – Ankur Aug 21 '19 at 21:42

1 Answers1

0

Use the __init__ function.

class CallExternalServiceAndFilter(beam.DoFn):
    def __init__():
        self.response_body_parsed = call_api()

    def to_runner_api_parameter(self, unused_context):
        pass

    def process(self, element, **kwargs):
        # here I have to make the http call and figure out whether to yield the element or not,
        # however this happens for each element of the set, as expected.
        if element['property'] < self.response_body_parsed['some_other_property']:
            logging.info("Yielding element")
            yield element
        else:
            logging.info("Not yielding element")

Or better yet, just call your API beforehand (on your local machine that builds the pipeline), and assign the values in __init__.

reponse_body_parsed = call_api()

class CallExternalServiceAndFilter(beam.DoFn):
    def __init__():
        self.response_body_parsed = reponse_body_parsed

    def to_runner_api_parameter(self, unused_context):
        pass

    def process(self, element, **kwargs):
        # here I have to make the http call and figure out whether to yield the element or not,
        # however this happens for each element of the set, as expected.
        if element['property'] < self.response_body_parsed['some_other_property']:
            logging.info("Yielding element")
            yield element
        else:
            logging.info("Not yielding element")

You said that using setup still does multiple calls. Is this still the case with __init__ (if you do the API call in the DoFn, and not beforehand)? The difference between __init__ and setup is still unclear to me.

Frederik Bode
  • 2,632
  • 1
  • 10
  • 17
  • I would suggest using setup method for pardo instead of init https://stackoverflow.com/questions/53034374/apache-beam-dofn-setup-equivalent-in-python-sdk – Ankur Aug 22 '19 at 19:43
  • hey @FrederikBode, even in this case the function is called multiple times. Not for each element, but still more than once, I can see it in the logs. It's an improvement still. – Vee6 Aug 27 '19 at 12:06