When you do
val df = (1 to 8).toSeq.toDF.repartition(8)
This will not create 8 partitions with 1 record each. If you inspect this dataframe (see e.g. https://stackoverflow.com/a/46032600/1138523), then you get :
+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
| 0| 0|
| 1| 0|
| 2| 0|
| 3| 0|
| 4| 0|
| 5| 0|
| 6| 4|
| 7| 4|
+----------------+-----------------+
So you will have only 2 partitions which are non empty, therefore you will have at max 2-fold parallelism (I've asked about this here : How does Round Robin partitioning in Spark work?)
To make equal-sized partitions, you better use
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
instead of
(1 to 8).toSeq.toDF.repartition(8).rdd
The first option gives you 1 record per partition, the second one not as it uses round robin partitioning
As a side note, when you do x.foreach
, then x
will be consumed (Iterators are only traversable once) so if you return x
you will always get an empty iterator.
So your final code can look like this :
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.mapPartitions(
x => {
val xL = x.toList // convert to List
assert(xL.size==1) // make sure partition has only 1 record
val conn = createConnection()
xL.foreach{
s => { // expect the below query be run concurently
execute(s"SELECT * FROM myTable WHERE col = ${s}")
}
}
conn.close()
xL.toIterator
})
.collect // trigger all queries
Instead of using mapPartitions
(which is lazy), you could also use foreachPartition
, which is non-lazy
As you have only 1 record per partition, iterating the partitions isn't really beneficial, you could also just use a plain foreach
:
spark.sparkContext.parallelize((0 to 7), numSlices = 8)
.foreach( s=> {
val conn = createConnection()
execute(s"SELECT * FROM myTable WHERE col = ${s}")
conn.close()
})