I have around 70 hive queries which I am executing in pyspark in sequence. I am looking at ways to improve the runtime be running the hive queries in parallel. I am planning to do this by by creating python threads and running the sqlContext.sql in the threads. Would this create threads in driver and improve performance.
-
Possible duplicate of https://stackoverflow.com/questions/32192893/how-to-read-and-write-multiple-tables-in-parallel-in-spark – Raghavendra Gupta Jan 29 '19 at 13:56
-
Possible duplicate of [How to run independent transformations in parallel using PySpark?](https://stackoverflow.com/questions/38048068/how-to-run-independent-transformations-in-parallel-using-pyspark) – user10938362 Jan 29 '19 at 14:40
1 Answers
I am assuming, you do not have any dependency on these hive queries and so they can run in parallel. You can accomplish this by threading, but not sure of the benefit in a single user application - because the total number of resources is fixed for your cluster i.e. the total time to finish the all the queries will be the same - as the spark scheduler will round robing across these individual jobs - when you multi thread it.
https://spark.apache.org/docs/latest/job-scheduling.html explains this 1) SPARK by default uses a FIFO scheduler ( which you are observing) 2) By threading you can use a "fair" scheduler 3) Ensure the method that is being threaded -set this sc.setLocalProperty("spark.scheduler.pool", ) 4) The pool id needs to be different for each thread
Example use case of threading from a code perspective:
# set the spark context to use a fair scheduler mode
conf = SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
sc = new SparkContext(conf)
# runs a query taking a spark context, pool_id and query..
def runQuery(sc,<POOL_ID>,query):
sc.setLocalProperty("spark.scheduler.pool", pool_id)
.....<your code>
return df
t1 = threading.thread(target=runQuery,args=(sc,"1",<query1>)
t2 = threading.thread(target=runQuery,args=(sc,"2",<query2>)
# start the threads...
t1.start()
t2.sart()
# wait for the threads to complete and get the returned data frames...
df1 = t1.join()
df2 = t2.join()
Like the spark documentation indicates, you will not observe an improvement in the overall throughput.. it is suited for multi-user sharing of resources. Hope this helps.

- 36
- 2
-
Thanks for answering. Can you please explain multi-user sharing of resources? There can be other spark jobs submitted to the cluster while running this program. So does this make it multi-user environment? – Ananth Gopinath Jan 29 '19 at 15:26
-
The question that needs to be answered - is FIFO scheduler good i.e. as more clients submit jobs is it ok that they wait, until the current job completes. If you want a fair allocation of resources and a round robin scheduling for jobs submitted in the cluster - then you need to support multi-clients (multiple users). HDP Hadoop framework allows for queues tied to clients, and each client can have different capacities ( number of resources) https://docs.hortonworks.com/HDPDocuments/HDP3/HDP-3.0.0/data-operating-system/content/manage_cluster_capacity_with_queues.html – all-things-cloud Jan 29 '19 at 15:52