4

I have Google Cloud Project with VPN enabled connectivity and Google Cloud SQL (PostgreSQL) database instance with the same VPN connectivity along with SSL enabled. Cloud SQL has both Public and Private IP addresses. Public IP I used for connecting database from external such as PgAdmin client tool and Private IP used for internal connectivity such as Dataflow. Now I want to connect this CloudSQL from Cloud Composer. Used PostgresOperator to connect the Cloud Postgresql database. Created separate connection with Puplic IP as port in under Airflow -> Connections section. Since this CloudSQL has SSL enabled, so pushed the certificates to DAG's GCS location. In the connection under the extra properties section just passed ssl certificates path information as like below,

{
   "sslmode": "verify-ca",
   "sslcert": "/home/airflow/gcs/dags/certificates/client-cert.pem",
   "sslca": "/home/airflow/gcs/dags/certificates/server-ca.pem",
   "sslkey": "/home/airflow/gcs/dags/certificates/client-key.pem"
}

Got below error message,

psycopg2.OperationalError: private key file "/home/airflow/gcs/dags/certificates/client-key.pem" has group or world access; permissions should be u=rw (0600) or les

It would be good if some one help me on this issue fix.

postgresoperator = PostgresOperator(
    task_id='create_field_reports',
    sql=create_field_reports_query,
    postgres_conn_id='pgconnection_google_private',
    dag=dag
)
lourdu rajan
  • 329
  • 1
  • 5
  • 24

3 Answers3

0

Cloud Composer uses GCSFUSE to mount certain directories (DAGs/plugins) from Cloud Storage into Airflow worker pods running in GKE. It mounts these with default permissions that cannot be overwritten, because that metadata is not tracked by GCS.

A workaround is to use a BashOperator that runs at the beginning of your DAG to copy the files to a new directory, and then run chmod on all of them.

hexacyanide
  • 88,222
  • 31
  • 159
  • 162
  • Wouldn't copying these files to a new directory still impose GCSFUSE on them, if they are copied to, for instance, (/home/airflow/gcs/data/)? Where would you copy the files to in order to run `chmod` subsequently? – Leedoe Nov 20 '19 at 13:37
  • Perhaps it's possible to pass the file content as environment variables and store them (and run chmod) on the underlying VM running the Dag? – Leedoe Nov 20 '19 at 17:11
  • Workers run in Kubernetes pods, which only mount specific paths using GCSFUSE. You can always use a non-GCSFUSE path like `/tmp`. – hexacyanide Nov 20 '19 at 17:14
  • Thanks, that clarified things for me :-) – Leedoe Nov 21 '19 at 08:13
  • But how would I access the ´/tmp´ folder from a dag? ´/home/airflow/gcs/tmp/´ can't be accessed directly since it's not GCSFUSED, if my understanding is correct. – Leedoe Nov 21 '19 at 08:48
0

You may want to use gcp_sql_operator instead as it takes care of the cloud proxy. You can see an example on my answer to a related question:

Google Cloud Composer and Google Cloud SQL

Tan Duong
  • 1,473
  • 2
  • 17
  • 29
0

It requires several steps for that, all sparsely documented in the web. It does not use SSL but I think it can be refactored to use SLL:

  1. Define a Cloud SQL connection factory with proxy:
def create_cloudsql_conn(name, user, password, instance, database, port='3308'):
    """
    MySQL: connect via proxy over TCP (specific proxy version)
    It uses the format AIRFLOW_CONN_* to create a connection named PROXY_ODS_VAT
    https://airflow.readthedocs.io/en/latest/howto/connection/gcp_sql.html
    """
    os.environ[f'AIRFLOW_CONN_{name.upper()}'] = \
        "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
        "database_type=mysql&" \
        "project_id={project_id}&" \
        "location={location}&" \
        "instance={instance}&" \
        "use_proxy=True&" \
        "sql_proxy_version=v1.13&" \
        "sql_proxy_use_tcp=True".format(
            user=quote_plus(user),
            password=quote_plus(password),
            public_ip='0.0.0.0',
            public_port=port,
            database=quote_plus(database),
            project_id=quote_plus(Variable.get('gcp_project')),
            location=quote_plus(Variable.get('gce_region')),
            instance=quote_plus(instance),
        )
  1. In your DAG file, create the connection:
create_cloudsql_conn(
    'proxy_ods_vat',
    Variable.get('gcsql_ods_user'),
    Variable.get('gcsql_ods_password'),
    Variable.get('gcsql_ods_instance'),
    Variable.get('gcsql_vat_database')
)
  1. Create a CloudSQLQueryOperator:
    cloudsql_prep = CloudSqlQueryOperator(
        task_id="cloudsql-load-prep",
        gcp_cloudsql_conn_id='proxy_ods_vat',
        sql='templates/ingestion_prep.sql',
        params={
            'database': Variable.get('gcsql_vat_database')
        },
    )
  1. Use you operator.
Alan Borsato
  • 248
  • 2
  • 13