10

I have two functions which I use to query database. Assuming two separate queries, how to run these in parallel to query same database, and also wait for both results to return before continuing the execution of the rest of the code?

def query1(param1, param2):
    result = None
    logging.info("Connecting to database...")
    try:
        conn = connect(host=host, port=port, database=db)
        curs = conn.cursor()
        curs.execute(query)
        result = curs
        curs.close()
        conn.close()
    except Exception as e:
        logging.error("Unable to access database %s" % str(e))
    return result


def query2(param1, param2):
    result = None 
    logging.info("Connecting to database...")
    try:
        conn = connect(host=host, port=port, database=db)
        curs = conn.cursor()
        curs.execute(query)
        result = curs
        curs.close()
        conn.close()  
    except Exception as e:
        logging.error("Unable to access database %s" % str(e))    
    return result
Rory Daulton
  • 21,934
  • 6
  • 42
  • 50
ArchieTiger
  • 2,083
  • 8
  • 30
  • 45
  • I guess you can take a look to the [`threading`](https://docs.python.org/3/library/threading.html) library from the standard collection. Their is some good posts here explaining how to use python threads (and gathering their return values) like http://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python (it's not really *parallelism* on python side, but both requests will be made almost simultaneously). – mgc Jun 22 '16 at 18:55

1 Answers1

12

Here is a multi-threaded code that does what you're trying to accomplish:

from threading import Thread, Lock

class DatabaseWorker(Thread):
    __lock = Lock()

    def __init__(self, db, query, result_queue):
        Thread.__init__(self)
        self.db = db
        self.query = query
        self.result_queue = result_queue

    def run(self):
        result = None
        logging.info("Connecting to database...")
        try:
            conn = connect(host=host, port=port, database=self.db)
            curs = conn.cursor()
            curs.execute(self.query)
            result = curs
            curs.close()
            conn.close()
        except Exception as e:
            logging.error("Unable to access database %s" % str(e))
        self.result_queue.append(result)

delay = 1
result_queue = []
worker1 = DatabaseWorker("db1", "select something from sometable",
        result_queue)
worker2 = DatabaseWorker("db1", "select something from othertable",
        result_queue)
worker1.start()
worker2.start()

# Wait for the job to be done
while len(result_queue) < 2:
    sleep(delay)
job_done = True
worker1.join()
worker2.join()
th3an0maly
  • 3,360
  • 8
  • 33
  • 54
  • Oops, sorry. Edited them to be `worker1` and `worker2`. I had written my test using those variables names first, and later changed them to match your code :) – th3an0maly Jun 23 '16 at 10:12
  • Thank you! by the way I got error on `result_queue.size<2`. that `list has no attribute size` so I changed `while result_queue.size < 2` to `len(result_queue)<2`. And how do I access individual result from both workers? – ArchieTiger Jun 23 '16 at 10:51
  • got it, `result_queue` – ArchieTiger Jun 23 '16 at 11:11
  • @user1128088 Good find again. I've corrected it. Thanks. You can assign identifiers to the result while queueing them. e.g. a `job_id`. Later, you can look it up in the results. If there are too many parallel queries being run at a time, I would suggest making `result_queue` a dictionary with keys being the `job_id`s and values being the actual results. – th3an0maly Jun 23 '16 at 13:25
  • I'm noticing a weird behavior. Sometimes `result_queue[0]` and `result_queue [1]` contain same data(it should never be, they are querying different tables). I'm running this as a `Flask` app so when i restart `Flask` it works again. Any idea why this is happening? – ArchieTiger Jun 30 '16 at 18:32
  • 2
    Can anyone explain what `__lock = Lock()` is doing? Thank you! – ShadyBears Sep 06 '19 at 16:56
  • Will this work OK also with writing to the same table, in parallel? Given that each worker will get a different dataframe, with different IDs – guyts Nov 11 '19 at 19:20
  • Hi, how can i extend this solution to execute as many same queries from same table , somethis like batching but in parallel? For example Select * from table limit 500 offset 100, where i define myself offset and 500 is count from that table – user2171512 Feb 12 '21 at 15:38