6

I am reading many images and I would like to work on a tiny subset of them for developing. As a result I am trying to understand how and could make that happen:

In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'

In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43

In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...

..so what is happening? I would expect the limit() to run much faster than what we had in [2], but that's not the case*.

Below I will describe my understanding, and please correct me, since obviously I am missing something:

  1. d is an RDD of pairs (I know that from the schema) and I am saying with the map function:

    i) Take every pair (which will be named x and give me back the photo_id attribute).

    ii) That will result in a new (anonymous) RDD, in which we are applying the first() method, which I am not sure how it works$, but should give me the first element of that anonymous RDD.

  2. In [3], we limit the d RDD to 1, which means that despite d has many elements, use only 1 and apply the map function to that one element only. The Out [3] should be the RDD created by the mapping.

  3. In [4], I would expect to follow the logic of [3] and just print the one and only element of the limited RDD...

As expected, after looking at the monitor, [4] seems to process the whole dataset, while the others aren't, so it seems that I am not using limit() correctly, or that that's not what am I looking for:

enter image description here


Edit:

tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()

The first will give a PipelinedRDD, which as described here, it will not actually do any action, just a transformation.

However, the second line will also process the whole dataset (as a matter of fact, the number of Tasks now are as many as before, plus one!).


*[2] executed instantly, while [4] is still running and >3h have passed..

$I couldn't find it in the documentation, because of the name.

gsamaras
  • 71,951
  • 46
  • 188
  • 305
  • I'm not sure about the execution time, but `sample()` gives you multiple data points. `first()`, as you can tell, just gives the first record. – OneCricketeer Aug 02 '16 at 02:09
  • @cricket_007 what does the "1st record" mean for Spark? Maybe it needs to process the whole dataset to determine that...However that wouldn't explain why `[3]` executed instantly. For the `sample()`, do you have something like [this](http://stackoverflow.com/questions/24806084/sampling-a-large-distributed-data-set-using-pyspark-spark) in mind? – gsamaras Aug 02 '16 at 02:15
  • `first()` is mostly a shortcut to `take(1)`, and you can read what [`take()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take) means. And yes, that link is exactly what I was referring to since you said "tiny subset". Yes 1 element is a subset, but you might want more than that :) – OneCricketeer Aug 02 '16 at 02:19
  • Hmm, could that be the answer then @cricket_007? I mean could `take()` be the problem, rather than `first()`? In the documentation it says: `Note that this method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.`. But I am not sure if that's the case here... Well surely not, since `[2]` executed instantly! Also check my interesting edit. – gsamaras Aug 02 '16 at 02:37
  • Right - about the memory and the transformation vs. action. Did you not know that Spark lazily evaluates? When it says `PythonRDD`, for example, that is just the type of result, you don't actually have that data until you do an action like `first()` or `collect()` – OneCricketeer Aug 02 '16 at 02:44
  • @cricket_007 yes I know, but yet, I would expect `[2]` to be slower than `[4]`. – gsamaras Aug 02 '16 at 02:47
  • Simply because of the map over all records? Yeah, I would expect that as well... – OneCricketeer Aug 02 '16 at 02:50
  • 3
    http://apache-spark-developers-list.1001551.n3.nabble.com/What-happens-in-Dataset-limit-followed-by-rdd-td18526.html – zero323 Aug 02 '16 at 10:05

1 Answers1

5

Based on your code, here is simpler test case on Spark 2.0

case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line

Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so check the physical plan of the two cases:

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
   +- *MapElements <function1>, obj#123: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
         +- Scan ExistingRDD[x#74]

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
   +- *MapElements <function1>, obj#130: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
         +- *GlobalLimit 1
            +- Exchange SinglePartition
               +- *LocalLimit 1
                  +- Scan ExistingRDD[x#74]

For the first case, it is related to an optimisation in the CollectLimitExec physical operator. That is, it will first fetch the first partition to get limit number of row, 1 in this case, if not satisfied, then fetch more partitions, until the desired limit is reached. So generally, if the first partition is not empty, only the first partition will be calculated and fetched. Other partitions will even not be computed.

However, in the second case, the optimisation in the CollectLimitExec does not help, because the previous limit operation involves a shuffle operation. All partitions will be computed, and running LocalLimit(1) on each partition to get 1 row, and then all partitions are shuffled into a single partition. CollectLimitExec will fetch 1 row from the resulted single partition.

Sun Rui
  • 166
  • 2
  • Thanks Sun Rui! However, something is still not clear to me, you said that `first` is equivalent to `limit(1).collect`, but in the explanation phase you use `limit(1)` only, why? – gsamaras Aug 04 '16 at 17:14
  • This is quite old but I'll give it a shot: Sun Rui- **why** does this second case creates a shuffle and why does it result in a single partition? – Vitaliy Jul 22 '18 at 16:49