I have a function called connection
which connects to the SQL databases. It can be simplified as:
import psycopg2 as pg
def connection():
_conn_DB = pg.connect(dbname="h2", user="h2naya", host=h2_IP, password=DB_PW)
_conn_DB.set_client_encoding('UTF8')
_conn_ML = pg.connect(dbname="postgres", user="h2naya", host="localhost", password=ML_PW)
_conn_ML.set_client_encoding('UTF8')
return {"db": _conn_DB, "ml": _conn_ML}
Now I am trying to get data within a 7-day period starting from a specified date, so I create another function:
import pandas as pd
conns = connection()
def get_latest_7d(date_string):
with conns["db"] as conn:
# date_string is formatted in my_sql in the real life
fasting_bg_latest_7d = pd.read_sql(my_sql, conn)
return fasting_bg_latest_7d
Now I can map get_latest_7d
with a list of date strings, e.g. map(get_latest_7d, ["2018-07-01", "2018-07-02", "2018-07-03"])
. It works.
As the list of date strings is really long, I'd like tool use multiprocessing.Pool.map
to accelerate the procedure. However, the codes below give me InterfaceError: connection already closed:
from multiprocessing import Pool
with Pool() as p:
results = p.map(get_latest_7d, ["2018-07-01", "2018-07-02", "2018-07-03"])
I tried different ways and found the only working one is moving the line conns = connection()
inside get_latest_7d
, and not closing the connections before returning the data frame. By saying "closing the connections" I mean:
for conn_val in conns.values():
conn_val.close()
It seems that I need to create connections for each process, and I am not allowed to close the connections before the process ends. I am curious about:
- Why can't the connection be shared across the processes?
- Why does closing a connection in one process affect other processes?
- Is there any better practice for my purpose?
PS: It seems to be recommended to build connections in each process but I am not sure that I fully understand.