3

I d'like to run a SQL query in parallel and be able to control the level of parallelism to 8 queries. Right now, I am doing this piece of code. The idea is to create 8 partition and allow executors to run them in parallel.

  (1 to 8).toSeq.toDF.repartition(8) // 8 partitions
  .rdd.mapPartitions(
  x => {
  val conn = createConnection()
    x.foreach{
      s => { // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = ${s.get(0)}")
      }
    }
  conn.close()
  x
  }).take(1)

The problem is the 8 queries are run one by one.

How should I proceed to get queries run 8 by 8 ?

parisni
  • 920
  • 7
  • 20
  • It should be okay if you are having enough resources to run 8 jobs in parallel (at least 8 threads in executors). You can also try executing the sql using udf. – Apurba Pandey Jan 09 '19 at 06:52

1 Answers1

6

When you do

val df = (1 to 8).toSeq.toDF.repartition(8)

This will not create 8 partitions with 1 record each. If you inspect this dataframe (see e.g. https://stackoverflow.com/a/46032600/1138523), then you get :

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|               0|                0|
|               1|                0|
|               2|                0|
|               3|                0|
|               4|                0|
|               5|                0|
|               6|                4|
|               7|                4|
+----------------+-----------------+

So you will have only 2 partitions which are non empty, therefore you will have at max 2-fold parallelism (I've asked about this here : How does Round Robin partitioning in Spark work?)

To make equal-sized partitions, you better use

spark.sparkContext.parallelize((0 to 7), numSlices = 8)

instead of

(1 to 8).toSeq.toDF.repartition(8).rdd

The first option gives you 1 record per partition, the second one not as it uses round robin partitioning

As a side note, when you do x.foreach, then x will be consumed (Iterators are only traversable once) so if you return x you will always get an empty iterator.

So your final code can look like this :

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
  x => {
  val xL = x.toList  // convert to List
  assert(xL.size==1) // make sure partition has only 1 record

  val conn = createConnection()
    xL.foreach{
      s => { // expect the below query be run concurently
      execute(s"SELECT * FROM myTable WHERE col = ${s}")
      }
    }
  conn.close()
  xL.toIterator
  })
 .collect // trigger all queries

Instead of using mapPartitions (which is lazy), you could also use foreachPartition, which is non-lazy

As you have only 1 record per partition, iterating the partitions isn't really beneficial, you could also just use a plain foreach:

 spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> {
  val conn = createConnection()
  execute(s"SELECT * FROM myTable WHERE col = ${s}")   
  conn.close()
})
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • can you please tell a bit what would be the benefit of using foreachPartition over mapPartitions ? – parisni Jan 09 '19 at 11:11
  • Your method has been working when I use *ForeachPartition* only. – parisni Jan 09 '19 at 13:33
  • @parisni are you sure you tried `mapPartitions` followed by `collect`? `count` should also do the trick if you don't need the return values – Raphael Roth Jan 09 '19 at 19:57
  • I mean *mapPartitions* works. It just didn't distribute as well as foreachPartition. Maybe related to lazy aspects – parisni Jan 09 '19 at 20:29
  • I was using take(1), maybe that makes sense and I should have test with collect. Anyway thanks for your help. – parisni Jan 10 '19 at 11:43