0

Is there a RDD method like take but which do not get all the elements in memory. For exemple, I may need to take 10^9 elements of my RDD and keep it as an RDD. What is the best way to do that ?

EDIT: A solution could be to zipWithIndex and filter with index < aBigValue but I am pretty sure there is a better solution.

EDIT 2: The code will be like

sc.parallelize(1 to 100, 2).zipWithIndex().filter(_._2 < 10).map(_._1)

It is a lot of operations just to reduce the size of an RDD :-(

Benjamin
  • 3,350
  • 4
  • 24
  • 49
  • You have 1,000,000,000 elements in memory? – Yuval Itzchakov Jun 01 '16 at 09:29
  • No, i have more than 10⁹ elements in my RDD and sometimes i may need to reduce the size of this RDD to something like 10⁹ – Benjamin Jun 01 '16 at 09:35
  • whats the issue with ZipWithIndex solution. It does the trick exactly what's required. – Preeti Khurana Jun 01 '16 at 09:56
  • I though it was not optimal – Benjamin Jun 01 '16 at 09:59
  • It looks complex but with strict requirements you cannot really do better than that. – zero323 Jun 01 '16 at 10:14
  • When you call zipWithIndex, I think spark does a lazy processing and does not really do something here. Only when you apply next transformation/action, it will act and only on those records which meet the criteria. So not unoptimal. Another plus point : You will get exact record and in exact sequence ( unless you are doing group by operations before it) – Preeti Khurana Jun 01 '16 at 10:25
  • Great, good news. Thank you Preeti – Benjamin Jun 01 '16 at 10:27
  • @PreetiKhurana It is not exactly true. `zipWithIndex` requires a separate action to obtain partition statistics. – zero323 Jun 01 '16 at 10:33
  • Yeah, i just cross checked (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-transformations.html#zipWithIndex) and it does call another job. Am not able to edit my previous comment. In that case OP can look at the spark code on how the indexes are created in zipWithIndex function and implement the same functionality along with his filter function in a combined filter function, it might work in optimized manner – Preeti Khurana Jun 01 '16 at 10:38

2 Answers2

1

I actually quite liked the zipWithIndex + filter mechanism, but if you are looking for an alternative that is sometimes much faster, I would suggest the sample function as described here: https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/rdd/RDD.html

data.count
...
res1: Long = 1000
val result = data.sample(false, 0.1, System.currentTimeMillis().toInt)
result.count
...
res2: Long = 100

Sample takes the whole RDD and subsets it by a fraction and returns this as another RDD - the problem is that if you are looking for exactly 150 samples from 127112310274 data rows, well, good luck writing that fraction parameter (you could try 150/data.length) - but if you roughly looking for 1-10th of your data, this function works much faster than your take/drop or zip and filter

GameOfThrows
  • 4,510
  • 2
  • 27
  • 44
  • Yes I want an exact number of value. – Benjamin Jun 01 '16 at 10:01
  • @Benjamin then it is probably faster to use the zipWithIndex and filter method, the alternative is java's takeSample, but this also returns an Array - which does not help your case. You could try setting the fraction parameter to ( numberOfSample / data.length) and see if that works. – GameOfThrows Jun 01 '16 at 10:15
  • I don't think sample shall also give you the results in right sequence ( as it was in the original data) – Preeti Khurana Jun 01 '16 at 10:27
  • @PreetiKhurana I do not think the OP required ordering - I was under the impression that he wanted to take some samples for further processing as a RDD without collection. Also, it is generally bad to say that a RDD has the right sequence - only an Array zipped with index parallelized to a RDD can have the right sequence. – GameOfThrows Jun 01 '16 at 10:30
  • @PreetiKhurana Sample doesn't reorder data http://stackoverflow.com/a/32231089/1560062 – zero323 Jun 01 '16 at 10:36
  • @zero323 : Since OP compared the functionality with 'take' function and take(10) returns the same records on same RDD while sample shall not behave that ways. – Preeti Khurana Jun 01 '16 at 10:45
0

A solution:

yourRDD.zipWithIndex().filter(_._2 < ExactNumberOfElements).map(_._1)

If you want an approximation, take GameOfThrows'solution

Benjamin
  • 3,350
  • 4
  • 24
  • 49