---------------> READ THE EDIT FIRST <---------------
I'm trying to develop a Dataflow pipeline which reads and writes to CloudSQL, but I'm facing a lot of connectivity issues.
First of all, there is not a native template / solution to do that, so I'm using a library developed by the community -> beam-nuggets which provides a collection of transforms for the apache beam python SDK.
This is what I have done so far:
Template
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from beam_nuggets.io import relational_db
def main():
# get the cmd args
db_args, pipeline_args = get_args()
# Create the pipeline
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
source_config = relational_db.SourceConfiguration(
drivername=db_args.drivername,
host=db_args.host,
port=db_args.port,
database=db_args.database,
username=db_args.username,
password=db_args.password,
)
data = p | "Reading records from db" >> relational_db.ReadFromDB(
source_config=source_config,
table_name=db_args.table
query='select name, num from months' # optional. When omitted, all table records are returned.
)
records | 'Writing to stdout' >> beam.Map(print)
def get_args():
parser = argparse.ArgumentParser()
# adding expected database args
parser.add_argument('--drivername', dest='drivername', default='mysql+pymysql')
parser.add_argument('--host', dest='host', default='cloudsql_instance_connection_name')
parser.add_argument('--port', type=int, dest='port', default=3307)
parser.add_argument('--database', dest='database', default='irmdb')
parser.add_argument('--username', dest='username', default='root')
parser.add_argument('--password', dest='password', default='****')
parser.add_argument('--table', dest='table', default="table_name")
parsed_db_args, pipeline_args = parser.parse_known_args()
return parsed_db_args, pipeline_args
if __name__ == '__main__':
main()
The job is created properly in Dataflow, but it remains loading without showing any logs:
It appears in red since I stopped the job.
Pipeline options:
Why can't I connect? What am I missing?
Thank you in advance for you help.
-------------------> EDIT <------------------------
Since I wasn't getting any results with the beam-nugget library, I've switched to the cloud-sql-python-connector library, which has been created by Google.
Let's start from scratch.
template.py
import argparse
from google.cloud.sql.connector import connector
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class ReadSQLTable(beam.DoFn):
"""
parDo class to get all table names of a given cloudSQL database.
It will return each table name.
"""
def __init__(self, hostaddr, host, username, password, dbname):
super(ReadSQLTable, self).__init__()
self.hostaddr = hostaddr
self.host = host
self.username = username
self.password = password
self.dbname = dbname
def process(self, element):
# Connect to database
conn = connector.connect(
self.hostaddr,
self.host,
user=self.username,
password=self.password,
db=self.dbname
)
# Execute a query
cursor = conn.cursor()
cursor.execute("SELECT * from table_name")
# Fetch the results
result = cursor.fetchall()
# Do something with the results
for row in result:
print(row)
def main(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--hostaddr',
dest='hostaddr',
default='project_name:region:instance_name',
help='Host Address')
parser.add_argument(
'--host',
dest='host',
default='pymysql',
help='Host')
parser.add_argument(
'--username',
dest='username',
default='root',
help='CloudSQL User')
parser.add_argument(
'--password',
dest='password',
default='password',
help='Host')
parser.add_argument(
'--dbname',
dest='dbname',
default='dbname',
help='Database name')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
# Read the text file[pattern] into a PCollection.
# Create a dummy initiator PCollection with one element
init = p | 'Begin pipeline with initiator' >> beam.Create(['All tables initializer'])
tables = init | 'Get table names' >> beam.ParDo(ReadSQLTable(
host=known_args.host,
hostaddr=known_args.hostaddr,
dbname=known_args.dbname,
username=known_args.username,
password=known_args.password))
if __name__ == '__main__':
# logging.getLogger().setLevel(logging.INFO)
main()
Following the Apache Beam documentation, we're supposed to upload a requirements.txt file to get the necessary packages.
requirements.txt
cloud-sql-python-connector==0.4.0
After that, we should be able to create the dataflow template.
python3 -m template --runner DataflowRunner /
--project project_name /
--staging_location gs://bucket_name/folder/staging /
--temp_location gs://bucket_name/folder/temp /
--template_location gs://bucket_name/folder//templates/template-df /
--region europe-west1 /
--requirements_file requirements.txt
But when I try to execute it, the following error appears:
The libraries are not being installed... neither apache-beam nor cloud-sql-python-connector
Since I was getting this errors on the Cloud shell, I tried to download the packages directly on the shell (Sounds desperate, I am.)
pip3 install -r requirements.txt
pip3 install wheel
pip3 install 'apache-beam[gcp]'
And I execute the function again. Now the template has been created properly:
Additionally, we should create a template_metatada which contains some information regarding the parameters. I don't know if I have to add anything else here, so:
{
"description": "An example pipeline.",
"name": "Motor prueba",
"parameters": [
]
FINALLY, I'm able to create and execute the pipeline, but as the last time, it remains loading without showing any logs:
Any clue? :/