0

Is there a way to use threads to simultaneously perform the SQL queries so I can cut down on fetching and processing time of my code below? Is there a better method to perform the same result faster? Given the size of the data sets, it's taking >22 seconds to get the result and likely to increase. Can I use multithread or improve this query to get results faster? Thanks in advance!

def fetch_14df(query1,query2,query3,query4,query5,query6,query7,query8,query9,query10,query11,query12,query13,query14):
    mypkey = paramiko.RSAKey.from_private_key_file('xxxx.pem')
    sql_main_database = "xxxx"
    sql_username = "xxxx"
    sql_password = "xxxx"
    sql_hostname = "xxxx"
    sql_port = xxxx
    
    ssh_host = "xxxx.compute-1.amazonaws.com"
    ssh_user = "viewUser"
    ssh_port = 22
    with SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_username=ssh_user,
            ssh_pkey=mypkey,
            remote_bind_address=(sql_hostname, sql_port)) as tunnel:
        conn = pymysql.connect(host='127.0.0.1', user=sql_username,
                passwd=sql_password, db=sql_main_database,
                port=tunnel.local_bind_port)
        #query = query
        df1 = pd.read_sql_query(query1, conn)
        df2 = pd.read_sql_query(query2, conn)
        df3 = pd.read_sql_query(query3, conn)
        df4 = pd.read_sql_query(query4, conn)
        df5 = pd.read_sql_query(query5, conn)
        df6 = pd.read_sql_query(query6, conn)
        df7 = pd.read_sql_query(query7, conn)
        df8 = pd.read_sql_query(query8, conn)
        df9 = pd.read_sql_query(query9, conn)
        df10 = pd.read_sql_query(query10, conn)
        df11 = pd.read_sql_query(query11, conn)
        df12 = pd.read_sql_query(query12, conn)
        df13 = pd.read_sql_query(query13, conn)
        df14 = pd.read_sql_query(query14, conn)
        conn.close()
    return df1, df2, df3, df4, df5, df6, df7, df8, df9, df10, df11, df12, df13, df14

Sample SQL Query:

select distinct partnerAssignedUserId, phone, date, source, planType,
    left(date, 7) as 'month',
    concat(left(date,5), if(week(date,3) < 10, concat('0', week(date,3)), week(date,3))) as 'week'
from
(
-- 
SELECT
DISTINCT
h.partnerAssignedUserId,
    k.phone,
    k.createTime as date,
    'Web+Sub' as source,
    'Annual' as planType
FROM
    Membership k    
    inner join PartnerReferredUsers h on (k.phone = h.activatePhone)
    LEFT JOIN Users u ON u.phone = k.phone 
    where k.STATUS = 1 and k.test = 0 and k.planType = 1

-- UNION
union

-- Manual Subs
SELECT
DISTINCT
h.partnerAssignedUserId,
    u.phone,
    if(sub.preLiveModeEndTime>NOW(), if(sub.preTrialStart is null,sub.preBillingCycleAnchor,sub.preTrialStart),if(sub.trialStart is null, sub.billingCycleAnchor,sub.trialStart)) as date,
    'Manual Upgrade' as source,
    if(p.`interval` = 'year', 'Annual', 'Monthly') as planType
FROM
    Subscription sub
    INNER JOIN Users u ON sub.customerId = u.stripeCustomerId 
    AND (
                (planId IN (SELECT planId FROM Plan WHERE planType = 1) AND sub.subStatus = 1) 
                OR 
                (prePlanId IN (SELECT planId FROM Plan WHERE planType = 1) AND sub.subStatus = 1 AND preLiveModeEndTime>now())
             )
                         
   AND NOT EXISTS ( SELECT phone FROM Membership k WHERE u.phone = k.phone  and k.status=1)
    inner join PartnerReferredUsers h on (u.phone = h.activatePhone)
    inner join Plan p on p.planId = sub.planId
) raw
ORDER BY date desc;
mrAdamant
  • 1
  • 2
  • we can't improve a query we can't see. We can't see the table structures `show create table {tablename}` either to know if this is correctly indexed. This a rather horrible interface to do bulk query. How are the queries used. Try to solve the single throughput problem before moving to multiple threads. – danblack Mar 20 '21 at 04:02
  • Thanks @danblack. The dataframe is loaded to provide about 30 visualizations in R after this. – mrAdamant Mar 20 '21 at 04:19
  • So? Are the queries related? Can the SQL be written better to handle the fetching in one go? What are table structures? If it is being done async, can each R visualization work separately? Please edit question to contain these sorts of details. – danblack Mar 20 '21 at 04:25
  • Are the queries you are running performing any insertions or updates that would alter the data used by the other queries? What type of database is being used, and does it support concurrent connections? Assuming that the database supports concurrent connections, and that these queries are *only* reading data, then multi-threading/multi-processing should offer a speedup, but without seeing the queries it's hard to say if multi-threading is feasible. – Tristen Mar 20 '21 at 04:45
  • @Tristen Edited the question with a sample SQL query. The queries are just fetching data without any updates or insertions from a view on AWS RDS. – mrAdamant Mar 20 '21 at 05:01
  • @danblack Edited the question with a sample SQL query. The queries are just fetching data without any updates or insertions from a view on AWS RDS. Each R data visualization can work separately. – mrAdamant Mar 20 '21 at 05:01

3 Answers3

0

It seems like you want to maximize the time on your queries ie: perform as many queries from python as fast and efficient as possible.

Firstly I would like to to say the comments are correct. When you say running "efficient way to do multi threaded calls" but then introduce queries you will have to elaborate on what the queries look like, what the tables structures look like. Simply put, if some of your queries from q1-q9 are slow then threading will make it slower (why? Because first misconception about threading seems to be parallel vs concurrency. Rest I will leave to the sources which discuss this in depth Good Read 1, Good Read 2, Python Threading). If something is missing or I have the wrong idea please correct me.

Here's an example of multi-threading you can play around with. Insert your queries and give it try and see what happens. Time it, and see how long it takes for your machine then try multi-processing. Play around with both and see which is better and which is worse or does structuring queries in a good way play a role in any way. Difference between good and bad queries when doing this.

import logging, threading, time


def tFunc(name, sleep):
    logging.info("Thread %s: start", name)
    time.sleep(sleep)
    logging.info("Thread %s: finish", name)


if __name__ == "__main__":
    logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO, datefmt="%H:%M:%S")
    logging.info("Main    : creating...")

    x = threading.Thread(target=tFunc, args=(1,2,))
    y = threading.Thread(target=tFunc, args=(2,15,))
    logging.info("Main    : going to execute start")

    x.start()
    y.start()

    logging.info("Main    : waiting...")
    logging.info("Main    : all done")

output for above snippet:

00:56:51: Main    : creating...
00:56:51: Main    : going to execute start
00:56:51: Thread 1: start
00:56:51: Thread 2: start
00:56:51: Main    : waiting...
00:56:51: Main    : all done // Notice Main is done and waiting on the other threads
00:56:53: Thread 1: finish
00:57:06: Thread 2: finish // Notice that the time. It's because of args=(2,15,)

Some other questions: 1. why not use Multi-Processing? 2. Can it fix the problem?

ziaahsan
  • 34
  • 5
  • I also found this if you haven't already seen https://stackoverflow.com/questions/56388918/multiprocessing-multithreading-for-database-query-in-python that might help. – ziaahsan Mar 20 '21 at 05:31
0

Something like this may work. I'm not positive if you can use the same connection across multiple threads, so you may need to create the SSH tunnel inside of each thread. Under the assumption that you can use the same connection object:

from multiprocessing.pool import ThreadPool

def fetch_df_target(query_args):
    return pd.read_sql_query(query_args["query"], query_args["conn"])

def run_together(queries, conn):
    pool = ThreadPool(len(queries))
    query_args = [{"query": q, "conn": conn} for q in queries]
    results = pool.map(fetch_df_target, query_args)
    pool.close()
    pool.join()
    return tuple(r for r in results)

def fetch_3df(query1, query2, query3):
    mypkey = paramiko.RSAKey.from_private_key_file('xxxx.pem')
    sql_main_database = "xxxx"
    sql_username = "xxxx"
    sql_password = "xxxx"
    sql_hostname = "xxxx"
    sql_port = xxxx

    ssh_host = "xxxx.compute-1.amazonaws.com"
    ssh_user = "viewUser"
    ssh_port = 22
    with SSHTunnelForwarder(
            (ssh_host, ssh_port),
            ssh_username=ssh_user,
            ssh_pkey=mypkey,
            remote_bind_address=(sql_hostname, sql_port)) as tunnel:
        conn = pymysql.connect(host='127.0.0.1', user=sql_username,
                               passwd=sql_password, db=sql_main_database,
                               port=tunnel.local_bind_port)
        # query = query
        queries = [query1, query2, query3]
        result = run_together(queries, conn)
    return result

However, you may find it useful to abstract things a bit, and utilize a class as follows (using the same fetch_df_target and run_together as above):

import time

class DataFetcher(object):

    def __init__(self, **kwargs):
        self.mypkey = paramiko.RSAKey.from_private_key_file(kwargs["mypkey"])
        self.sql_main_database = kwargs["sql_main_database"]
        self.sql_username = kwargs["sql_username"]
        self.sql_password = kwargs["sql_password"]
        self.sql_hostname = kwargs["sql_hostname"]
        self.sql_port = kwargs["sql_port"]
        self.ssh_host = kwargs["ssh_host"]
        self.ssh_user = kwargs["ssh_user"]
        self.ssh_port = kwargs["ssh_port"]

    def run_queries(self, queries):
        with SSHTunnelForwarder((self.ssh_host, self.ssh_port),
                                ssh_username=self.ssh_user,
                                ssh_pkey=self.mypkey,
                                remote_bind_address=(self.sql_hostname, self.sql_port)
                                ) as tunnel:
            conn = pymysql.connect(host='127.0.0.1',
                                   user=self.sql_username,
                                   passwd=self.sql_password,
                                   db=self.sql_main_database,
                                   port=tunnel.local_bind_port)
            result = run_together(queries, conn)
        return result

kwargs = {
    "mypkey": 'xxxx.pem',
    "sql_main_database": "xxx",
    "sql_username": "xxx",
    "sql_password": "xxx",
    "sql_hostname": "xxx",
    "sql_port": 0,
    "ssh_host": "xxx",
    "ssh_user": "xxx",
    "ssh_port": 0
}

queries = ["SELECT * FROM TABLE1", "SELECT * FROM TABLE2"]
now = time.now()
data_fetcher = DataFetcher(**kwargs)
result = data_fetcher.run_queries(queries)
print(f"Time to execute took: {time.now() - now}")

You may want to experiment a bit with both multiprocessing and multithreading to see which one yields the best results. The answer I've provided gives an example using multi-threading as you've requested, but with minimal changes you could use the same code to switch things to multiprocessing. See the multiprocessing docs for more info on that.

Tristen
  • 362
  • 1
  • 18
0

There are two aspects here -- whether multiple connections would help and whether the query can be sped up.

There is a bunch of overhead in creating a connection, etc. Is the 22+ seconds for 14 sequentially run queries? That is 0.7s each? I would not expect better than 2 seconds if you could run them all in parallel. Do you have more queries than CPU cores? At some point, multiple queries simply stumble over each other rather than increase the total number of "queries per second".

The sample query is quite involved. I'll make some stabs at what might speed it up:

  • UNION means UNION DISTINCT. If there are no dups, then switch to UNION ALL, which is intrinsically faster.

  • SELECT DISTINCT also needs an extra pass to de-dup. I can't quite see where the dups might be coming about. Use of EXISTS() sometimes helps avoid dups.

  • Some possible indexes:

      k:  (status, phone, planType, test, createTime)
      h:  (activatePhone, partnerAssignedUserId)
      p:  (planId, interval)
      u:  (phone, stripeCustomerId)
    
  • Using "semijoin" (EXISTS) is likely to be better than IN( SELECT .. ), as in

                   (planId IN (  SELECT  planId
                                      FROM  Plan
                                      WHERE  planType = 1)
                                        AND  sub.subStatus = 1
                              )
      -->
                  EXISTS ( SELECT 1 FROM Plan
                              WHERE  planType = 1)
                                AND  sub.subStatus = 1
                                AND  planId = p.planId
                         )
    
  • Similarly for checking prePlanId.

  • Move AND sub.subStatus = 1 out of both sides of the OR.

Rick James
  • 135,179
  • 13
  • 127
  • 222