2

I'm working on python to add many records into a Postgres DB, because I have to insert millions records I need to parallelize 2 functions:

insert_A(cur)  
insert_B(cur) 

where:

conn=psycopg2.connect(.....)
cur = conn.cursor() 

I tried a solution I found in another post here in stack overflow like:

result1= pool.apply_async(insert_A, cur)  
result2= pool.apply_async(insert_B, cur)  
answer1=result1.get(timeout=15)  
answer2=result2.get(timeout=15)  

but I got this error: Traceback (most recent call last):
File "test.py", line 387, in
answer1=result1.get(timeout=15)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 558, in get
raise self._value
psycopg2.InterfaceError: the cursor has no connection

somebody could help me please? :(

maverickk89
  • 81
  • 1
  • 10
  • what i found in the other post is a general way to parallelize 2 functions, obviously nothing with postgres and its connections... here I tried to adapt it but with this result... – maverickk89 Apr 12 '17 at 10:24
  • Have you tried to establish the connection inside the functions `insert_A` and `insert_B`? – fedterzi Apr 12 '17 at 10:29
  • No they work on different tables I need that one function works on one core and another function on another core – maverickk89 Apr 12 '17 at 11:07
  • You should then have a look at [how to exchange objects between processes](https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes), since the 2 functions will be spawned on 2 different processes – fedterzi Apr 12 '17 at 12:14
  • I don't understand why, because the due functions are independent between each other, so they do not have to talk each other, I simply want to have two processes in which each do its own function that have no si military with other one a part the connection to the same DB... – maverickk89 Apr 12 '17 at 12:24
  • Do you mean that I have to give the connection to each process trough a pipe (a pipe for each process)? But It's not clear for me how to use that in my situation and how to apply that article to my code – maverickk89 Apr 12 '17 at 12:38
  • I tried this way: if __name__ == '__main__': p = Process(target=insert_measurement_real, args=(cur,)) q = Process(target=insert_measurement_daily, args=(cur2,)) p.start() q.start() p.join() q.join() but after one insertion, done correctly from, both processes only the first continues the second stops with error: DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq – maverickk89 Apr 12 '17 at 14:10
  • the complete error is: Process Process-2: Traceback (most recent call last): File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "test.py", line 350, in insert_measurement_daily cur.execute(....) DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq – maverickk89 Apr 12 '17 at 14:18
  • I found [this issue](https://github.com/psycopg/psycopg2/issues/281) related to psycopg2, basically it says that you cannot use a connection in a separate process, you have to create a new connection in the working process, as I suggested in the first comment (last comment in the issue) – fedterzi Apr 12 '17 at 14:20
  • ok now I solved!! thank you!! last question: these processes works with cores or thread? because if my processor is a dual core with 4 thread, in the first case I would create at most 2 processes, but if they works with threads I would create 4 processes... in the end I want to maximize the parallelizing without overhead the processor... – maverickk89 Apr 12 '17 at 14:37
  • CPython memory management is not thread-safe, so the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock) prevents multiple native threads from executing Python bytecodes at once. The subprocesses will work with cores – fedterzi Apr 12 '17 at 14:41
  • could you explain better this please: "CPython memory management is not thread-safe, so the GIL prevents multiple native threads from executing Python bytecodes at once. " – maverickk89 Apr 12 '17 at 14:47
  • I think it is well explained [here](http://stackoverflow.com/questions/1294382/what-is-a-global-interpreter-lock-gil) – fedterzi Apr 12 '17 at 15:03
  • ok, thank you!! :) – maverickk89 Apr 12 '17 at 15:11

2 Answers2

5

in the end I solved in this way:

if __name__ == '__main__':
    p = Process(target=insert_A, args=(ARG,))
    q = Process(target=insert_measurement_real, args=(ARG,))
    p.start()
    q.start()
    p.join()
    q.join()

and creating the connection inside the function instead of creating it outside the function (and passing the connection as parameter)...

leoschet
  • 1,697
  • 17
  • 33
maverickk89
  • 81
  • 1
  • 10
4

As this issue reports, it is not possible on psycopg2 to use a connection on a separate process, the connection must be established in the working process.

fedterzi
  • 1,105
  • 7
  • 17
  • 2
    This helped me, but IMHO it is not good pattern. If you have 10 workers, then 10 connections are open at the same time. On other side, you lost time while connection is opening and closing. – Tomas Pytel Jun 27 '17 at 11:37