0

I have a list of documents in a MongoDB database and I would like to constantly extract parts of each that are not already in the database into a MySQL table hosted on CloudSQL to prepare a product recommendation algorithm. That is to say I want to transform all data from the following MongoDB query:

>>> all_perfumes = list(collection.aggregate([
    {"$project": {"d": 1}}
]))

Like this document:

>>> all_perfumes =[0]
{'_id': ObjectId('5fd5e617260828c7646000aa'), 'd': {'attributs': {'Doux': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Délicat': {'claimed_benefit':
 0, 'perceived_benefit': 0.0}, 'Elegant': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Mature': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Se
xy': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Féminin': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Frais': {'claimed_benefit': 0, 'perce
ived_benefit': 0.35294117647058826}, 'Classe': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Mou': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, '
Décontracté': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Comme les autres': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Jeune femme': {'cl
aimed_benefit': 0, 'perceived_benefit': 0.0}, 'charmant': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Gai': {'claimed_benefit': 0, 'perceived_benefi
t': 0.058823529411764705}, 'Propre': {'claimed_benefit': 0, 'perceived_benefit': 0.058823529411764705}, 'Eté': {'claimed_benefit': 0, 'perceived_benefit':
 0.0}, 'Rafraîchissant': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Chaud': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Masculin': {'claime
d_benefit': 0, 'perceived_benefit': 0.23529411764705882}, 'Fiable': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Mystérieux': {'claimed_benefit': 0,
 'perceived_benefit': 0.0}, 'Furtif': {'claimed_benefit': 0, 'perceived_benefit': 0.058823529411764705}, 'Fort': {'claimed_benefit': 0, 'perceived_benefit'
: 0.4117647058823529}, 'Hivernal': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Herbacé': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Plantes
': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Big brands': {'claimed_benefit': 0, 'perceived_benefit': 0.058823529411764705}, 'Luxueux': {'claimed_
benefit': 0, 'perceived_benefit': 0.0}, 'Connu': {'claimed_benefit': 0, 'perceived_benefit': 0.23529411764705882}, 'A la mode': {'claimed_benefit': 0, 'per
ceived_benefit': 0.0}}}}

by taking the mean of each attributs and ingest each results to a MySQL table on CloudSQL. Yet I have not yet created this table. So I thought of doing this from the Cloud Shell:

import datetime

import pandas as pd
import pytz
import pymongo

import sqlalchemy
from sqlalchemy import create_engine


### CREATING DATAFRAME ###
mongo_client = pymongo.MongoClient("mongodb+srv://username:password@cluster0.n2hnd.mongodb.net/ifresearch?retryWrites=true&w=majority")
collection = mongo_client.test.sephora_backup3

all_perfumes = list(collection.aggregate([
            {"$project": {"d": 1}}
        ]))

# Necesito filtrar los perfumes que ya estan en la Base de Datos MySQL
# o los que no han sido modificados
rows_list = []
for perfume in all_perfumes:
    for attribute in perfume['d']['attributs'].items():
        up_dict = {attribute[0]: sum(attribute[1].values())}
        perfume['d']['attributs'].update(up_dict)
    rows_list.append(perfume['d']['attributs'])
df = pd.DataFrame(rows_list)


db = 'scentmate'
db_tbl_name = 'scores'


'''
Create a mapping of df dtypes to mysql data types (not perfect, but close enough)
'''
def dtype_mapping():
    return {'object' : 'TEXT',
        'int64' : 'INT',
        'float64' : 'FLOAT',
        'datetime64' : 'DATETIME',
        'bool' : 'TINYINT',
        'category' : 'TEXT',
        'timedelta[ns]' : 'TEXT'}
'''
Create a sqlalchemy engine
'''
def mysql_engine(user = 'root', password = '', host = '0.1.2.3', port = '???', database = 'scentmate'):
    engine = create_engine("mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8".format(user, password, host, port, database))
    return engine

'''
Create a mysql connection from sqlalchemy engine
'''
def mysql_conn(engine):
    conn = engine.raw_connection()
    return conn
'''
Create sql input for table names and types
'''
def gen_tbl_cols_sql(df):
    dmap = dtype_mapping()
    sql = "pi_db_uid INT AUTO_INCREMENT PRIMARY KEY"
    df1 = df.rename(columns = {"" : "nocolname"})
    hdrs = df1.dtypes.index
    hdrs_list = [(hdr, str(df1[hdr].dtype)) for hdr in hdrs]
    for i, hl in enumerate(hdrs_list):
        sql += " ,{0} {1}".format(hl[0], dmap[hl[1]])
    return sql

'''
Create a mysql table from a df
'''
def create_mysql_tbl_schema(df, conn, db, tbl_name):
    tbl_cols_sql = gen_tbl_cols_sql(df)
    sql = "USE {0}; CREATE TABLE {1} ({2})".format(db, tbl_name, tbl_cols_sql)
    cur = conn.cursor()
    cur.execute(sql)
    cur.close()
    conn.commit()

'''
Write df data to newly create mysql table
'''
def df_to_mysql(df, engine, tbl_name):
    df.to_sql(tbl_name, engine, if_exists='replace')

create_mysql_tbl_schema(df, mysql_conn(mysql_engine()), db, db_tbl_name)
df_to_mysql(df, mysql_engine(), db_tbl_name)

But got:

(etl_env) mikempc3@cloudshell:~$ python3 et_scores.py 
Traceback (most recent call last):
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3212, in _wrap_pool_connect
    return fn()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 301, in connect
    return _ConnectionFairy._checkout(self)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 761, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 419, in checkout
    rec = pool._do_get()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
    self._dec_overflow()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
    return self._create_connection()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
    return _ConnectionRecord(self)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 362, in __init__
    self.__connect()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 605, in __connect
    pool.logger.debug("Error on connect(): %s", e)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 599, in __connect
    connection = pool._invoke_creator(self)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 584, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/__init__.py", line 130, in Connect
    return Connection(*args, **kwargs)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/connections.py", line 185, in __init__
    super().__init__(*args, **kwargs2)
MySQLdb._exceptions.OperationalError: (2003, "Can't connect to MySQL server on '35.240.96.173' (110)")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "et_scores.py", line 103, in <module>
    create_mysql_tbl_schema(df, mysql_conn(mysql_engine()), db, db_tbl_name)
  File "et_scores.py", line 71, in mysql_conn
    conn = engine.raw_connection()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3245, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3216, in _wrap_pool_connect
    e, dialect, self
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2069, in _handle_dbapi_exception_noconnection
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3212, in _wrap_pool_connect
    return fn()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 301, in connect
    return _ConnectionFairy._checkout(self)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 761, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 419, in checkout
    rec = pool._do_get()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
    self._dec_overflow()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
    return self._create_connection()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
    return _ConnectionRecord(self)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 362, in __init__
    self.__connect()
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 605, in __connect
    pool.logger.debug("Error on connect(): %s", e)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 599, in __connect
    connection = pool._invoke_creator(self)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 584, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/__init__.py", line 130, in Connect
    return Connection(*args, **kwargs)
  File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/connections.py", line 185, in __init__
    super().__init__(*args, **kwargs2)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2003, "Can't connect to MySQL server on '35.240.96.173' (110)")
(Background on this error at: http://sqlalche.me/e/14/e3q8)
(etl_env) mikempc3@cloudshell:~$ 
Revolucion for Monica
  • 2,848
  • 8
  • 39
  • 78
  • The question talks about mysql, but your code uses google bigquery. Which database product do you use? – Shadow May 19 '21 at 11:24
  • @Shadow damn, I use MySQL 9.0 (at least this is what I chose). Sorry I’m very new to cloud computing. – Revolucion for Monica May 19 '21 at 11:28
  • There is no such thing as mysql 9.0, only 8.0 :) But your code tries to upload the data to bigquery, which is not mysql at all. So, you need to replace that part of your code completely. – Shadow May 19 '21 at 11:35
  • Thanks for pointing that out @Shadow I will try to solve my problem to connect and ingest dataframes to MySQL on gcp. So, if I've well understood MySQL is for tables and BigQuery for "big data". I chose the first as I am more familiar but my goal is to do product recomendations and I know I can do that with MySQL with one of the two products. Do you know which? – Revolucion for Monica May 19 '21 at 12:21
  • Both are technically capable of running queries on data and thus are suitable for use case. Which is more suitable for you I cannot tell and such questions are explicitly off topic here on SO anyway. – Shadow May 19 '21 at 12:50

0 Answers0