7

I have an application in SparkSQL which returns large number of rows that are very difficult to fit in memory so I will not be able to use collect function on DataFrame, is there a way using which I can get all this rows as an Iterable instaed of the entire rows as list.

I am executing this SparkSQL application using yarn-client.

mck
  • 40,932
  • 13
  • 35
  • 50
Sachin Janani
  • 1,310
  • 1
  • 17
  • 33

2 Answers2

5

Generally speaking transferring all the data to the driver looks a pretty bad idea and most of the time there is a better solution out there but if you really want to go with this you can use toLocalIterator method on a RDD:

val df: org.apache.spark.sql.DataFrame = ???
df.cache // Optional, to avoid repeated computation, see docs for details
val iter: Iterator[org.apache.spark.sql.Row]  = df.rdd.toLocalIterator 
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Kind of late to this but could you elaborate more on a few of the better solutions? – irregular Mar 02 '17 at 17:21
  • @irregular It depends on particular application but most of the time `mapPartitions`, `foreachPartition` or similar method is more than enough. Do you have any particular use case in mind? – zero323 Mar 02 '17 at 20:36
  • I'm also looking at a dataset that's too large for memory. Unfortunately I run into this error https://issues.apache.org/jira/browse/SPARK-10189 when using toLocalIterator. So I've been looking into adding ROW_NUMBER to query through the db https://paste.pound-python.org/show/sk4bPE5P9QsKhmcYlsK0/ I'm not quite sure how to set up partitions so this is how i'm going about it atm – irregular Mar 02 '17 at 20:54
  • Uhm, what exactly this code suppose to do? :) Are you trying to do something like [this](http://stackoverflow.com/a/33878701/1560062)? – zero323 Mar 02 '17 at 21:55
  • Sorry for the lack of explanation! I am trying to get a list of (sceneid, id), the sceneid and id is organized by groups of userid -> lessonid -> videoid -> createdat time so there are many database rows with many duplicates of those. I am trying to aggregate that data any duplicate userid/lessonid/videoid/createdat values will result in a few sceneid's which need to be deleted. so those for loops are going through the database rows and collecting that data. The process is slow so I wonder if partitioning would speed things up – irregular Mar 02 '17 at 21:59
  • @irregular So why not `groupBy(userid, lessonid, videoid, createdate)` and take `first(whatever-left)`. BTW Your SQL is not distributed as far as I can tell. Don't use window functions without `PARTITION BY`: https://git.io/vynqU Oh, and filters are linear. In general take a look at the question I've linked. I think this is what you want. – zero323 Mar 02 '17 at 22:17
1

Actually you can just use: df.toLocalIterator, here is the reference in Spark source code:

/**
 * Return an iterator that contains all of [[Row]]s in this Dataset.
 *
 * The iterator will consume as much memory as the largest partition in this Dataset.
 *
 * Note: this results in multiple Spark jobs, and if the input Dataset is the result
 * of a wide transformation (e.g. join with different partitioners), to avoid
 * recomputing the input Dataset should be cached first.
 *
 * @group action
 * @since 2.0.0
 */
def toLocalIterator(): java.util.Iterator[T] = withCallback("toLocalIterator", toDF()) { _ =>
withNewExecutionId {
  queryExecution.executedPlan.executeToIterator().map(boundEnc.fromRow).asJava
  }
}
Kehe CAI
  • 1,161
  • 12
  • 18