2

I use Spark 2.1.1. I do many joins and selects on an input DS (inputDs) in a loop by hour it looks like this:

val myDs =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
}).reduce(_.union(_))

def getDsForOneHour(ds: Dataset[I], year:Int, month:Int, day:Int, hour: Int)(implicit sql: SQLImplicits):Dataset[I]= {
ds.where(col("year") === year and col("month") ===  month and col("day") ===  day and col("hour") === hour)
}

I run that code using spark-testing-base and it takes about 3 minutes to complete operations for one month (~30*24 unions&selects). These are all lazy operations I'm wondering why it takes so much time Spark to build myDs ?

Marious
  • 143
  • 1
  • 11
  • That's the cost of computing execution plan. [Spark unionAll multiple dataframes](https://stackoverflow.com/q/37612622) should help you. – Alper t. Turker May 21 '18 at 14:55

1 Answers1

2

I guess its slow because the execution plan is updated for every new dataset unioned in the loop. You could rewrite your code to build up the filter first:

def getFilterForOneHour(year:Int, month:Int, day:Int, hour: Int): Column = {
  col("year") === year and col("month") ===  month and col("day") ===  day and col("hour") === hour
} 


val myFilter =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getFilterForOneHour(next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
}).reduce(_ or _)

val myDs = inputDs.where(myFilter)

EDIT: What you can also try is to do a group-wise union (my case with a batch-size of 50). I've run some tests wich a dummy in-memory dataset, and this improved performance by a factor of 8 in my case:

val myDs =  Iterator.iterate(fromDate)(_.plus(ofHours(1))).takeWhile(_.isBefore(toDate)).map(next => {
getDsForOneHour(inputDs, next.getYear, next.getMonthValue, next.getDayOfMonth, next.getHour)
})
.grouped(50).map(dss => dss.reduce(_ union _))
.reduce(_ union _)
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • That's a clever solution but I'm afraid it may not create right pushdown filters to my underlaying CassandraDB storage PK(year,month,day). But definitely I will check that solution against DB. – Marious May 21 '18 at 18:01
  • interesting ! I also noticed that union works quite fast for a moment until it reaches X iterations and after that it dramatically slows down. Having it grouped it means small chunks to union and finally speed up for the whole computation. I've also checked DAGs generated for that grouped unions and they are exactly the same as for my original version. Seems like Spark has some internal issue with handling many unions at once, probably should be reported as something to improve in the future Spark releases. I really appreciate that help. – Marious May 21 '18 at 21:20