1

Is there anyway to connectionpool or use a connection across multiple processes?

I am trying to use one connection across multiple processes. Here is the code (running on python 2.7, pyodbc).

# Import custom python packages
import pathos.multiprocessing as mp
import pyodbc

class MyManagerClass(object):
    def __init__(self):
        self.conn = None
        self.result = []
    def connect_to_db(self):
        conn = pyodbc.connect("DSN=cpmeast;UID=dntcore;PWD=dntcorevs2")
        cursor = conn.cursor()
        self.conn = conn
        return cursor

    def read_data(self, *args):
        cursor = args[0][0]
        data = args[0][1]
        print 'Running query'
        cursor.execute("WAITFOR DELAY '00:00:02';select GETDATE(), '"+data+"';")
        self.result.append(cursor.fetchall())

def read_data(*args):
    print 'Running query', args
#     cursor.execute("WAITFOR DELAY '00:00:02';select GETDATE(), '"+data+"';")


def main():
    dbm = MyManagerClass()
    conn = pyodbc.connect("DSN=cpmeast;UID=dntcore;PWD=dntcorevs2")
    cursor = conn.cursor()

    pool = mp.ProcessingPool(4)
    for i in pool.imap(dbm.read_data, ((cursor, 'foo'), (cursor, 'bar'))):
        print i
    pool.close()
    pool.join()

    cursor.close();
    dbm.conn.close()

    print 'Result', dbm.result
    print 'Closed'

if __name__ == '__main__':
    main()

I am getting the following error:

Process PoolWorker-1:
Traceback (most recent call last):
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/process.py", line 227, in _bootstrap
    self.run()
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/process.py", line 85, in run
    self._target(*self._args, **self._kwargs)
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/pool.py", line 54, in worker
    for job, i, func, args, kwds in iter(inqueue.get, None):
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/queue.py", line 327, in get
    return recv()
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/dill-0.2.4-py2.7.egg/dill/dill.py", line 209, in loads
    return load(file)
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/dill-0.2.4-py2.7.egg/dill/dill.py", line 199, in load
    obj = pik.load()
  File "/home/amit/envs/py_env_clink/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/home/amit/envs/py_env_clink/lib/python2.7/pickle.py", line 1083, in load_newobj
    obj = cls.__new__(cls, *args)
TypeError: object.__new__(pyodbc.Cursor) is not safe, use pyodbc.Cursor.__new__()
Process PoolWorker-2:
Traceback (most recent call last):
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/process.py", line 227, in _bootstrap
    self.run()
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/process.py", line 85, in run
    self._target(*self._args, **self._kwargs)
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/pool.py", line 54, in worker
    for job, i, func, args, kwds in iter(inqueue.get, None):
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/processing/queue.py", line 327, in get
    return recv()
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/dill-0.2.4-py2.7.egg/dill/dill.py", line 209, in loads
    return load(file)
  File "/home/amit/envs/py_env_clink/lib/python2.7/site-packages/dill-0.2.4-py2.7.egg/dill/dill.py", line 199, in load
    obj = pik.load()
  File "/home/amit/envs/py_env_clink/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/home/amit/envs/py_env_clink/lib/python2.7/pickle.py", line 1083, in load_newobj
    obj = cls.__new__(cls, *args)
TypeError: object.__new__(pyodbc.Cursor) is not safe, use pyodbc.Cursor.__new__()
amulllb
  • 3,036
  • 7
  • 50
  • 87
  • Not sure it will help but as pyodbc seems to complain about how python try to instantiate new cursor objects to send them to the process of your pool, can't you initialize a cursor in each process instead of passing them in arguments to `imap` ? – mgc Feb 11 '16 at 20:26
  • I tried passing the connection, but it still fails. – amulllb Feb 11 '16 at 20:49
  • Possible duplicate of [Python multiprocessing and database access with pyodbc "is not safe"?](http://stackoverflow.com/questions/1537809/python-multiprocessing-and-database-access-with-pyodbc-is-not-safe) – Gord Thompson Feb 12 '16 at 14:09

1 Answers1

0

The problem is with the Pickle stage. Pickle doesn't know inherently how to serialize a connection. Consider:

import pickle
import pymssql
a = {'hello': 'world'}
server = 'server'
username = 'username'
password = 'password'
database = 'database'
conn = pymssql.connect(host=server,user=username,password=password,database=database)
with open('filename.pickle', 'wb') as handle:
    pickle.dump(conn, handle, protocol=pickle.HIGHEST_PROTOCOL)

with open('filename.pickle', 'rb') as handle:
    b = pickle.load(handle)
print(a == b)

This results in the following error message:

Traceback (most recent call last):
  File "pickle_ex.py", line 10, in <module>
    pickle.dump(conn, handle, protocol=pickle.HIGHEST_PROTOCOL)
  File "stringsource", line 2, in _mssql.MSSQLConnection.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

But if you replace conn with a in pickle.dump, the code will run and print out True. You may be able to define a custom reduce method in your class, but I wouldn't try it, considering how this would result in temp tables acting like global temp tables but only accessible across these processes (which shouldn't be allowed to transpire) anyways.

Links: My pickle code is from here: How can I use pickle to save a dict?