0

Environment

  • Scala
  • Apache Spark: Spark 2.2.1
  • EMR on AWS: emr-5.12.1

Content

I have one large DataFrame, like below:

val df = spark.read.option("basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")

There are JSON files ~1TB at s3://some_bucket and it includes 5000 partitions of group_id. I want to execute conversion using SparkSQL, and it differs by each group_id.

The Spark code is like below:

// Create view
val df = spark.read.option("basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")
df.createOrReplaceTempView("lakeView")

// one of queries like this:
// SELECT 
//   col1 as userId,
//   col2 as userName,
//   .....
// FROM
//   lakeView
// WHERE
//   group_id = xxx;
val queries: Seq[String] = getGroupIdMapping

// ** Want to know better ways **
queries.par.foreach(query => {
  val convertedDF: DataFrame = spark.sql(query)
  convertedDF.write.save("s3://another_bucket/")
})

The par can parallelize by Runtime.getRuntime.availableProcessors num, and it will be equal to the number of driver's cores.

But It seems weird and not efficient enough because it has nothing to do with Spark's parallization.

I really want to do with something like groupBy in scala.collection.Seq.

This is not right spark code:

df.groupBy(groupId).foreach((groupId, parDF) => {
  parDF.createOrReplaceTempView("lakeView")
  val convertedDF: DataFrame = spark.sql(queryByGroupId)
  convertedDF.write.save("s3://another_bucket")
})
ryochanuedasan
  • 528
  • 1
  • 6
  • 11

1 Answers1

4

1) First of all if your data is already stored in files per group id there is no reason to mix it up and then group by id using Spark. It's much more simple and efficient to load for each group id only relevant files

2) Spark itself parallelizes the computation. So in most cases there is no need for external parallelization. But if you feel that Spark doesn't utilize all resources you can:

a) if each individual computation takes less than few seconds then task schedulling overhead is comparable to task execution time so it's possible to get a boost by running few tasks in parallel.

b) computation takes significant amount of time but resources are still underutilized. Then most probably you should increase the number of partitions for your dataset.

3) If you finally decided to run several tasks in parallel it can be achieved this way:

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 
simpadjo
  • 3,947
  • 1
  • 13
  • 38
  • Nice example. If I make this as a utility function which accept an array of queries and submit in Future using sqlContext.sql(query) then how can I return the resulting DF array as an array of DataFrames? – Devas May 08 '18 at 07:42
  • @Krishas not sure that an array of `DataFrame`s is what you really want. `DataFrame` is not a result of a computation, just a description. Anyway, I think it's offtopic and it's not convenient to paste code samples in comments. If you like you can create a separate question. – simpadjo May 08 '18 at 08:37
  • I just want to return the DataFrame s to the caller. I know it is off topic here, anyway is it doable? – Devas May 08 '18 at 09:53
  • @Krishas if you want to just return a few DataFrames without actually computing them you'll most probably not gain any boost from parallelism. But of course it's doable. – simpadjo May 08 '18 at 10:18
  • In this case(also mine) the DF s are query outputs which we can considered as a computational out put ? Why can't I gain any boost in parallelism here? Anyway each queries are running parallel..!!! – Devas May 08 '18 at 10:26
  • Because the original code contains `action` on a dataframe (`convertedDF.write.save("s3://another_bucket")`). No real computation happens unless you call an `action` on a Dataframe/Dataset/RDD. Please read about the difference between `transformations` and `actions` in Spark. – simpadjo May 08 '18 at 11:12
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/170649/discussion-between-krishas-and-simpadjo). – Devas May 09 '18 at 06:23