3

I need to update every record in a spatial database in which I have a data set of points that overlay data set of polygons. For each point feature I want to assign a key to relate it to the polygon feature that it lies within. So if my point 'New York City' lies within polygon USA and for the USA polygon 'GID = 1' I will assign 'gid_fkey = 1' for my point New York City.

To do this I have created the following query.

procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

At present I am getting the cityID info from another query that just selects all cityID where gid_fkey is NULL. Essentially I just need to loop through these and run the query shown earlier. As the query only relies on static information in the other table in theory all of these processes can be run at once. I have implemented the threading procedure below but I can't seem to make the migration to multiprocessing

import psycopg2, pprint, threading, time, Queue

queue = Queue.Queue()
pyConn = psycopg2.connect("dbname='geobase_1' host='localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()

getGID = 'SELECT cityID FROM city'
pyCursor1.execute(getGID)
gidList = pyCursor1.fetchall()

class threadClass(threading.Thread):

def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

def run(self):

        while True:
            gid = self.queue.get()

            procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID)

            pyCursor2 = pyConn.cursor()                         
            pyCursor2.execute(procQuery)

            print gid[0]                    
            print 'Done'

def main():

    for i in range(4):
        t = threadClass(queue)
        t.setDaemon(True)
        t.start()

        for gid in gidList:
            queue.put(gid)

    queue.join()

main()

I'm not even sure if the multithreading is optimal but it is definitely faster than going through one by one.

The machine I will be using has four cores (Quad Core) and a minimal Linux OS with no GUI, PostgreSQL, PostGIS and Python if that makes a difference.

What do I need to change to get this painfully easy multiprocessing task enabled?

user1267259
  • 761
  • 2
  • 10
  • 22
EnE_
  • 531
  • 1
  • 9
  • 19

2 Answers2

5

Okay this is an answer to my own post. Well done me =D

Produces about a 150% increase in speed on my system going from a single core thread to quad core multiprocessing.

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue

def run(self):
    proc_name = self.name
    while True:
        next_task = self.task_queue.get()
        if next_task is None:
            print 'Tasks Complete'
            self.task_queue.task_done()
            break            
        answer = next_task()
        self.task_queue.task_done()
        self.result_queue.put(answer)
    return


class Task(object):
def __init__(self, a):
    self.a = a

def __call__(self):        
    pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConn.set_isolation_level(0)
    pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

    pyCursor1.execute(procQuery)
    print 'What is self?'
    print self.a

    return self.a

def __str__(self):
    return 'ARC'
def run(self):
    print 'IN'

if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()

num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
    w.start()

pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()

pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
temp = pyCursorX.fetchall()    
num_job = temp[0]
num_jobs = num_job[0]

pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
cityIdListTuple = pyCursorX.fetchall()    

cityIdList = []

for x in cityIdListTuple:
    cityIdList.append(x[0])


for i in xrange(num_jobs):
    tasks.put(Task(cityIdList[i - 1]))

for i in xrange(num_consumers):
    tasks.put(None)

while num_jobs:
    result = results.get()
    print result
    num_jobs -= 1

Now I have another question which I have posted here:

Create DB connection and maintain on multiple processes (multiprocessing)

Hopefully we can get rid of some overhead and speed this baby up even more.

Community
  • 1
  • 1
EnE_
  • 531
  • 1
  • 9
  • 19
  • hey @ene, if this solved your question, it's good practice to mark it as answered :) – eberbis Mar 08 '16 at 17:36
  • Yeah, it is weird, because I was only a guest user or some such when I posted this question I do not have the ability to mark my own question as correct. You can see the thumbnail image has not updated along with mine. Suggestions on how to resolve are welcome – EnE_ Mar 09 '16 at 04:43
  • 1
    oh yes.. the problem is that you posted your question under a different (unregistered) user (http://stackoverflow.com/users/954992/ene) and you are now using a registered one (http://stackoverflow.com/users/965035/ene) to reply. As you can see the ids on them are different. This might help: http://meta.stackexchange.com/questions/74024/registration-with-my-unregistered-account – eberbis Mar 09 '16 at 12:20
  • Never run an IO operation using Mutli process, always use Async or Multi threading – Mrinal Kamboj Mar 26 '21 at 12:21
1

In plain SQL one could do something like:

UPDATE city ci
SET gid_fkey = co.gid 
FROM country co 
WHERE ST_within(ci.the_geom , co.the_geom) 
AND ci.city_id = _some_parameter_
        ;

There could be a problem if a city would fit into more than one country (causing multiple updates to the same target row), but that is probably not the case in your data.

joop
  • 4,330
  • 1
  • 15
  • 26