I have a list of documents in a MongoDB database and I would like to constantly extract parts of each that are not already in the database into a MySQL table hosted on CloudSQL to prepare a product recommendation algorithm. That is to say I want to transform all data from the following MongoDB query:
>>> all_perfumes = list(collection.aggregate([
{"$project": {"d": 1}}
]))
Like this document:
>>> all_perfumes =[0]
{'_id': ObjectId('5fd5e617260828c7646000aa'), 'd': {'attributs': {'Doux': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Délicat': {'claimed_benefit':
0, 'perceived_benefit': 0.0}, 'Elegant': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Mature': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Se
xy': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Féminin': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Frais': {'claimed_benefit': 0, 'perce
ived_benefit': 0.35294117647058826}, 'Classe': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Mou': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, '
Décontracté': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Comme les autres': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Jeune femme': {'cl
aimed_benefit': 0, 'perceived_benefit': 0.0}, 'charmant': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Gai': {'claimed_benefit': 0, 'perceived_benefi
t': 0.058823529411764705}, 'Propre': {'claimed_benefit': 0, 'perceived_benefit': 0.058823529411764705}, 'Eté': {'claimed_benefit': 0, 'perceived_benefit':
0.0}, 'Rafraîchissant': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Chaud': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Masculin': {'claime
d_benefit': 0, 'perceived_benefit': 0.23529411764705882}, 'Fiable': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Mystérieux': {'claimed_benefit': 0,
'perceived_benefit': 0.0}, 'Furtif': {'claimed_benefit': 0, 'perceived_benefit': 0.058823529411764705}, 'Fort': {'claimed_benefit': 0, 'perceived_benefit'
: 0.4117647058823529}, 'Hivernal': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Herbacé': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Plantes
': {'claimed_benefit': 0, 'perceived_benefit': 0.0}, 'Big brands': {'claimed_benefit': 0, 'perceived_benefit': 0.058823529411764705}, 'Luxueux': {'claimed_
benefit': 0, 'perceived_benefit': 0.0}, 'Connu': {'claimed_benefit': 0, 'perceived_benefit': 0.23529411764705882}, 'A la mode': {'claimed_benefit': 0, 'per
ceived_benefit': 0.0}}}}
by taking the mean of each attributs
and ingest each results to a MySQL table on CloudSQL. Yet I have not yet created this table. So I thought of doing this from the Cloud Shell:
import datetime
import pandas as pd
import pytz
import pymongo
import sqlalchemy
from sqlalchemy import create_engine
### CREATING DATAFRAME ###
mongo_client = pymongo.MongoClient("mongodb+srv://username:password@cluster0.n2hnd.mongodb.net/ifresearch?retryWrites=true&w=majority")
collection = mongo_client.test.sephora_backup3
all_perfumes = list(collection.aggregate([
{"$project": {"d": 1}}
]))
# Necesito filtrar los perfumes que ya estan en la Base de Datos MySQL
# o los que no han sido modificados
rows_list = []
for perfume in all_perfumes:
for attribute in perfume['d']['attributs'].items():
up_dict = {attribute[0]: sum(attribute[1].values())}
perfume['d']['attributs'].update(up_dict)
rows_list.append(perfume['d']['attributs'])
df = pd.DataFrame(rows_list)
db = 'scentmate'
db_tbl_name = 'scores'
'''
Create a mapping of df dtypes to mysql data types (not perfect, but close enough)
'''
def dtype_mapping():
return {'object' : 'TEXT',
'int64' : 'INT',
'float64' : 'FLOAT',
'datetime64' : 'DATETIME',
'bool' : 'TINYINT',
'category' : 'TEXT',
'timedelta[ns]' : 'TEXT'}
'''
Create a sqlalchemy engine
'''
def mysql_engine(user = 'root', password = '', host = '0.1.2.3', port = '???', database = 'scentmate'):
engine = create_engine("mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8".format(user, password, host, port, database))
return engine
'''
Create a mysql connection from sqlalchemy engine
'''
def mysql_conn(engine):
conn = engine.raw_connection()
return conn
'''
Create sql input for table names and types
'''
def gen_tbl_cols_sql(df):
dmap = dtype_mapping()
sql = "pi_db_uid INT AUTO_INCREMENT PRIMARY KEY"
df1 = df.rename(columns = {"" : "nocolname"})
hdrs = df1.dtypes.index
hdrs_list = [(hdr, str(df1[hdr].dtype)) for hdr in hdrs]
for i, hl in enumerate(hdrs_list):
sql += " ,{0} {1}".format(hl[0], dmap[hl[1]])
return sql
'''
Create a mysql table from a df
'''
def create_mysql_tbl_schema(df, conn, db, tbl_name):
tbl_cols_sql = gen_tbl_cols_sql(df)
sql = "USE {0}; CREATE TABLE {1} ({2})".format(db, tbl_name, tbl_cols_sql)
cur = conn.cursor()
cur.execute(sql)
cur.close()
conn.commit()
'''
Write df data to newly create mysql table
'''
def df_to_mysql(df, engine, tbl_name):
df.to_sql(tbl_name, engine, if_exists='replace')
create_mysql_tbl_schema(df, mysql_conn(mysql_engine()), db, db_tbl_name)
df_to_mysql(df, mysql_engine(), db_tbl_name)
But got:
(etl_env) mikempc3@cloudshell:~$ python3 et_scores.py
Traceback (most recent call last):
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3212, in _wrap_pool_connect
return fn()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 301, in connect
return _ConnectionFairy._checkout(self)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 761, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 419, in checkout
rec = pool._do_get()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
self._dec_overflow()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
with_traceback=exc_tb,
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
return self._create_connection()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
return _ConnectionRecord(self)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 362, in __init__
self.__connect()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 605, in __connect
pool.logger.debug("Error on connect(): %s", e)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
with_traceback=exc_tb,
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 599, in __connect
connection = pool._invoke_creator(self)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/create.py", line 578, in connect
return dialect.connect(*cargs, **cparams)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 584, in connect
return self.dbapi.connect(*cargs, **cparams)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/__init__.py", line 130, in Connect
return Connection(*args, **kwargs)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/connections.py", line 185, in __init__
super().__init__(*args, **kwargs2)
MySQLdb._exceptions.OperationalError: (2003, "Can't connect to MySQL server on '35.240.96.173' (110)")
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "et_scores.py", line 103, in <module>
create_mysql_tbl_schema(df, mysql_conn(mysql_engine()), db, db_tbl_name)
File "et_scores.py", line 71, in mysql_conn
conn = engine.raw_connection()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3245, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3216, in _wrap_pool_connect
e, dialect, self
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2069, in _handle_dbapi_exception_noconnection
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 3212, in _wrap_pool_connect
return fn()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 301, in connect
return _ConnectionFairy._checkout(self)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 761, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 419, in checkout
rec = pool._do_get()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
self._dec_overflow()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
with_traceback=exc_tb,
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
return self._create_connection()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
return _ConnectionRecord(self)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 362, in __init__
self.__connect()
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 605, in __connect
pool.logger.debug("Error on connect(): %s", e)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
with_traceback=exc_tb,
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/pool/base.py", line 599, in __connect
connection = pool._invoke_creator(self)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/create.py", line 578, in connect
return dialect.connect(*cargs, **cparams)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 584, in connect
return self.dbapi.connect(*cargs, **cparams)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/__init__.py", line 130, in Connect
return Connection(*args, **kwargs)
File "/home/mikempc3/etl_env/lib/python3.7/site-packages/MySQLdb/connections.py", line 185, in __init__
super().__init__(*args, **kwargs2)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2003, "Can't connect to MySQL server on '35.240.96.173' (110)")
(Background on this error at: http://sqlalche.me/e/14/e3q8)
(etl_env) mikempc3@cloudshell:~$