I'm investigating an interesting case that involves wide transformations (e.g. repartition & join) on a slow RDD or dataset, e.g. the dataset defined by the following code:
val ds = sqlContext.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - ${ii}")
ii
}
}
The slow dataset is relevant as it resembles a view of a remote data source, and the partition iterator is derived from a single-threaded network protocol (http, jdbc etc.), in this case, the speed of download > the speed of single-threaded processing, but << the speed of distributed processing.
Unfortunately the conventional Spark computation model won't be efficient on a slow dataset because we are confined to one of the following options:
Use only narrow transformations (flatMap-ish) to pipe the stream with data processing end-to-end in a single thread, obviously the data processing will be a bottle neck and resource utilisation will be low.
Use a wide operation (repartitioning included) to balance the RDD/dataset, while this is essential for parallel data processing efficiency, the Spark coarse-grained scheduler demands that the download to be fully completed, which becomes another bottleneck.
Experiment
The following program represents a simple simulation of such case:
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
println(f"repartitioned - ${ii}")
ii
}
mapped2.foreach { _ =>
}
When executing the above program it can be observed that line println(f"repartitioned - ${ii}")
will not be executed before line println(f"skewed - ${ii}")
in RDD dependency.
I'd like to instruct Spark scheduler to start distributing/shipping data entries generated by the partition iterator before its task completion (through mechanisms like microbatch or stream). Is there a simple way of doing this? E.g. converting the slow dataset into a structured stream would be nice, but there should be alternatives that are better integrated.
Thanks a lot for your opinion
UPDATE: to make your experimentation easier I have appended my scala tests that can be ran out of the box:
package com.tribbloids.spookystuff.spike
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{FunSpec, Ignore}
@Ignore
class SlowRDDSpike extends FunSpec {
lazy val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
lazy val sc: SparkContext = spark.sparkContext
lazy val sqlContext: SQLContext = spark.sqlContext
import sqlContext.implicits._
describe("is repartitioning non-blocking?") {
it("dataset") {
val ds = sqlContext
.createDataset(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}
it("RDD") {
val ds = sc
.parallelize(1 to 100)
.repartition(1)
.mapPartitions { itr =>
itr.map { ii =>
Thread.sleep(100)
println(f"skewed - $ii")
ii
}
}
val mapped = ds
val mapped2 = mapped
.repartition(10)
.map { ii =>
Thread.sleep(400)
println(f"repartitioned - $ii")
ii
}
mapped2.foreach { _ =>
}
}
}