1

---------------> READ THE EDIT FIRST <---------------

I'm trying to develop a Dataflow pipeline which reads and writes to CloudSQL, but I'm facing a lot of connectivity issues.

First of all, there is not a native template / solution to do that, so I'm using a library developed by the community -> beam-nuggets which provides a collection of transforms for the apache beam python SDK.

This is what I have done so far:

Template

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions

from beam_nuggets.io import relational_db


def main():
    # get the cmd args
    db_args, pipeline_args = get_args()

    # Create the pipeline
    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(options=options) as p:
        source_config = relational_db.SourceConfiguration(
            drivername=db_args.drivername,
            host=db_args.host,
            port=db_args.port,
            database=db_args.database,
            username=db_args.username,
            password=db_args.password,
        )

        data = p | "Reading records from db" >> relational_db.ReadFromDB(
            source_config=source_config,
            table_name=db_args.table
            query='select name, num from months'  # optional. When omitted, all table records are returned.
        )
        records | 'Writing to stdout' >> beam.Map(print)



def get_args():
    parser = argparse.ArgumentParser()
    # adding expected database args
    parser.add_argument('--drivername', dest='drivername', default='mysql+pymysql')
    parser.add_argument('--host', dest='host', default='cloudsql_instance_connection_name')
    parser.add_argument('--port', type=int, dest='port', default=3307)
    parser.add_argument('--database', dest='database', default='irmdb')
    parser.add_argument('--username', dest='username', default='root')
    parser.add_argument('--password', dest='password', default='****')
    parser.add_argument('--table', dest='table', default="table_name")

    parsed_db_args, pipeline_args = parser.parse_known_args()

    return parsed_db_args, pipeline_args


if __name__ == '__main__':
    main()

The job is created properly in Dataflow, but it remains loading without showing any logs:

enter image description here

It appears in red since I stopped the job.

Pipeline options:

enter image description here

Why can't I connect? What am I missing?

Thank you in advance for you help.


-------------------> EDIT <------------------------

Since I wasn't getting any results with the beam-nugget library, I've switched to the cloud-sql-python-connector library, which has been created by Google.

Let's start from scratch.

template.py

import argparse

from google.cloud.sql.connector import connector

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class ReadSQLTable(beam.DoFn):
    """
    parDo class to get all table names of a given cloudSQL database.
    It will return each table name.
    """

    def __init__(self, hostaddr, host, username, password, dbname):
        super(ReadSQLTable, self).__init__()

        self.hostaddr = hostaddr
        self.host = host
        self.username = username
        self.password = password
        self.dbname = dbname

    def process(self, element):
        # Connect to database

        conn = connector.connect(
            self.hostaddr,
            self.host,
            user=self.username,
            password=self.password,
            db=self.dbname
        )

        # Execute a query
        cursor = conn.cursor()
        cursor.execute("SELECT * from table_name")

        # Fetch the results
        result = cursor.fetchall()

        # Do something with the results
        for row in result:
            print(row)


def main(argv=None, save_main_session=True):
    """Main entry point; defines and runs the wordcount pipeline."""

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--hostaddr',
        dest='hostaddr',
        default='project_name:region:instance_name',
        help='Host Address')
    parser.add_argument(
        '--host',
        dest='host',
        default='pymysql',
        help='Host')
    parser.add_argument(
        '--username',
        dest='username',
        default='root',
        help='CloudSQL User')
    parser.add_argument(
        '--password',
        dest='password',
        default='password',
        help='Host')
    parser.add_argument(
        '--dbname',
        dest='dbname',
        default='dbname',
        help='Database name')
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    with beam.Pipeline(options=pipeline_options) as p:
        # Read the text file[pattern] into a PCollection.

        # Create a dummy initiator PCollection with one element
        init = p | 'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])
        tables = init | 'Get table names' >> beam.ParDo(ReadSQLTable(
            host=known_args.host,
            hostaddr=known_args.hostaddr,
            dbname=known_args.dbname,
            username=known_args.username,
            password=known_args.password))


if __name__ == '__main__':
    # logging.getLogger().setLevel(logging.INFO)
    main()

Following the Apache Beam documentation, we're supposed to upload a requirements.txt file to get the necessary packages.

requirements.txt

cloud-sql-python-connector==0.4.0

After that, we should be able to create the dataflow template.

python3 -m template --runner DataflowRunner /
                    --project project_name /
                    --staging_location gs://bucket_name/folder/staging /
                    --temp_location gs://bucket_name/folder/temp /
                    --template_location gs://bucket_name/folder//templates/template-df /
                    --region europe-west1 /
                    --requirements_file requirements.txt

But when I try to execute it, the following error appears:

enter image description here

The libraries are not being installed... neither apache-beam nor cloud-sql-python-connector

enter image description here

Since I was getting this errors on the Cloud shell, I tried to download the packages directly on the shell (Sounds desperate, I am.)

pip3 install -r requirements.txt
pip3 install wheel
pip3 install 'apache-beam[gcp]'

And I execute the function again. Now the template has been created properly:

enter image description here

Additionally, we should create a template_metatada which contains some information regarding the parameters. I don't know if I have to add anything else here, so:

{
  "description": "An example pipeline.",
  "name": "Motor prueba",
  "parameters": [
  ]

FINALLY, I'm able to create and execute the pipeline, but as the last time, it remains loading without showing any logs:

enter image description here

Any clue? :/

jbra95
  • 881
  • 2
  • 11
  • 30
  • 1
    Caveat: I have not used the nugget library. From Dataflow, we usually suggest using the jdbc connector we (Google) provide as it handles a lot of the weird connectivity stuff. Having said that, your default port is showing as 3307, try port 3306 as a quick "eliminate that as a possible problem"? – Gabe Weiss Sep 13 '21 at 16:27
  • If it helps, link to the SO question that was answered by using the jdbc library: https://stackoverflow.com/questions/44699643/connecting-to-cloud-sql-from-dataflow-job – Gabe Weiss Sep 13 '21 at 16:28
  • @GabeWeiss as far as i know, the jdbc connector is not available on Python, is it? For that reason I'm using an external library... – jbra95 Sep 14 '21 at 06:55
  • Derp, that'll teach me to not look closely at the full question before I responded. :) I keep forgetting Dataflow also has Python now (insert "back in my day" comment here). However! There may be help, we have a Python connector that's available as well here: https://github.com/GoogleCloudPlatform/cloud-sql-python-connector It was written by a coworker I work with pretty closely, so if you want to go that route, let me know how it goes and I can pass back feedback and get answers if you need. – Gabe Weiss Sep 14 '21 at 16:59
  • Woa! Fantastic. Let me check it and I will come back soon. Thanks, in advance! @GabeWeiss – jbra95 Sep 15 '21 at 09:01
  • Did the Python connector work for you? (might be a good idea to create a community wiki answer and accept it if that's the case and @GabeWeiss doesn't want to create an answer himself) – fabc Sep 20 '21 at 14:21
  • I'll add an answer just in case. :) – Gabe Weiss Sep 20 '21 at 16:41
  • @JuanBravoRoig were you able to connect to RDBMS source using the python connector? Your input would be helpful – Ambesh Sep 23 '22 at 20:25

1 Answers1

0

I haven't used the nugget.io library, so I'm unfamiliar with how it handles connectivity. I'd suggest trying out the Python connector maintained by Google:

https://github.com/GoogleCloudPlatform/cloud-sql-python-connector

See if that's able to connect for you.

Gabe Weiss
  • 3,134
  • 1
  • 12
  • 15
  • Thanks, Gabe! The main issue it's how to connect it through Dataflow... I'm not an Apache Beam expert ^^. Should i create some kind of personalized function to establish the connection with the SQL? – jbra95 Sep 21 '21 at 11:19
  • Oooooh yeah, I too, am not awesome with Dataflow/Beam. Having said that, I'm pretty sure you need to have the connection usage in a ParDo. Check this answer (don't do what it's doing, but it should give you a guide to creating the connection part): https://stackoverflow.com/questions/50701736/start-cloudsql-proxy-on-python-dataflow-apache-beam/50832174#50832174. Specifically you don't want to do what it's doing because it's allowlisting every connection which is bad. The connector should be able to be used in the connection code so you don't have to open up the instance. – Gabe Weiss Sep 21 '21 at 16:01
  • Hi Gabe, I've updated the question to add more info :) – jbra95 Sep 22 '21 at 16:50
  • mmm, that's weird. Sadly my Dataflow knowledge is pretty limited, let me poke a coworker and see if he has any ideas. – Gabe Weiss Sep 22 '21 at 19:25
  • I would be very grateful! Thanks, Gabe! – jbra95 Sep 23 '21 at 09:13