43

I've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.

UDP: commited toLocalIterator method

epahomov
  • 631
  • 1
  • 6
  • 7

6 Answers6

46

Update: RDD.toLocalIterator method that appeared after the original answer has been written is a more efficient way to do the job. It uses runJob to evaluate only a single partition on each step.

TL;DR And the original answer might give a rough idea how it works:

First of all, get the array of partition indexes:

val parts = rdd.partitions

Then create smaller rdds filtering out everything but a single partition. Collect the data from smaller rdds and iterate over values of a single partition:

for (p <- parts) {
    val idx = p.index
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
    //The second argument is true to avoid rdd reshuffling
    val data = partRdd.collect //data contains all values from a single partition 
                               //in the form of array
    //Now you can do with the data whatever you want: iterate, save to a file, etc.
}

I didn't try this code, but it should work. Please write a comment if it won't compile. Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true).

Wildfire
  • 6,358
  • 2
  • 34
  • 50
  • 2
    does this code cause each partition to be computed in serial when it loops through and call mapPartitionsWithIndex? What's the best way to remedy this? – foboi1122 Nov 18 '15 at 00:42
  • @Wildfire Will this approach resolve [this](http://stackoverflow.com/questions/38044231/save-a-spark-rdd-using-mappartition-with-iterator/38046472#38046472). Else how to resolve using any or might be this approach. – ChikuMiku Jun 27 '16 at 13:55
  • @Wildfire - does it work in parallel across multiple executors? I am looking to perform parallel processing across executors but retrieve the results as list to driver for further processing. – Dwarrior Sep 13 '19 at 07:47
  • @Dwarrior Just perform shuffle after expensive computation, i.e.: `rdd.map(....).repartition(10).toLocalIterator`. In this case results will be computed in parallel and then collected sequentially to driver. – Wildfire Sep 13 '19 at 13:25
  • @Wildfire - Can you please check this question and let me know what am I doing wrong? Thanks! – Dwarrior Sep 15 '19 at 18:52
  • @Wildfire - https://stackoverflow.com/questions/57947269/spark-2-x-with-mappartitions-large-number-of-records-parallel-processing – Dwarrior Sep 18 '19 at 12:34
15

Wildfire answer seems semantically correct, but I'm sure you should be able to be vastly more efficient by using the API of Spark. If you want to process each partition in turn, I don't see why you can't using map/filter/reduce/reduceByKey/mapPartitions operations. The only time you'd want to have everything in one place in one array is when your going to perform a non-monoidal operation - but that doesn't seem to be what you want. You should be able to do something like:

rdd.mapPartitions(recordsIterator => your code that processes a single chunk)

Or this

rdd.foreachPartition(partition => {
  partition.toArray
  // Your code
})
samthebest
  • 30,803
  • 25
  • 102
  • 142
10

Here is the same approach as suggested by @Wildlife but written in pyspark.

The nice thing about this approach - it lets user access records in RDD in order. I'm using this code to feed data from RDD into STDIN of the machine learning tool's process.

rdd = sc.parallelize(range(100), 10)
def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter

for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    data_from_part_rdd = part_rdd.collect()
    print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)

Produces output:

partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
Community
  • 1
  • 1
vvladymyrov
  • 5,715
  • 2
  • 32
  • 50
2

pyspark dataframe solution using RDD.toLocalIterator():

separator  = '|'
df_results = hiveCtx.sql(sql)
columns    = df_results.columns
print separator.join(columns)

# Use toLocalIterator() rather than collect(), as this avoids pulling all of the
# data to the driver at one time.  Rather, "the iterator will consume as much memory
# as the largest partition in this RDD."
MAX_BUFFERED_ROW_COUNT = 10000
row_count              = 0
output                 = cStringIO.StringIO()
for record in df_results.rdd.toLocalIterator():
    d = record.asDict()
    output.write(separator.join([str(d[c]) for c in columns]) + '\n')
    row_count += 1
    if row_count % MAX_BUFFERED_ROW_COUNT== 0:
        print output.getvalue().rstrip()
        # it is faster to create a new StringIO rather than clear the existing one
        # http://stackoverflow.com/questions/4330812/how-do-i-clear-a-stringio-object
        output = cStringIO.StringIO()
if row_count % MAX_BUFFERED_ROW_COUNT:
    print output.getvalue().rstrip()
Mark Rajcok
  • 362,217
  • 114
  • 495
  • 492
1

Map/filter/reduce using Spark and download the results later? I think usual Hadoop approach will work.

Api says that there are map - filter - saveAsFile commands: https://spark.incubator.apache.org/docs/0.8.1/scala-programming-guide.html#transformations

ya_pulser
  • 2,620
  • 2
  • 16
  • 21
  • Bad option. I don't want to do serialization/deserialization. So I want this data retrieving from spark – epahomov Feb 11 '14 at 10:37
  • How do you intend to get 1gb without serde(i.e. storing on the disk.) ? on a node with 512mb ? – Prashant Sharma Feb 12 '14 at 09:13
  • 1
    By iterating over the RDD. You should be able to get each partition in sequence to send each data item in sequence to the master, which can then pull them off the network and work on them. – interfect Feb 12 '14 at 18:07
1

For Spark 1.3.1 , the format is as follows

val parts = rdd.partitions
    for (p <- parts) {
        val idx = p.index
        val partRdd = data.mapPartitionsWithIndex { 
           case(index:Int,value:Iterator[(String,String,Float)]) => 
             if (index == idx) value else Iterator()}
        val dataPartitioned = partRdd.collect 
        //Apply further processing on data                      
    }
agankur21
  • 11
  • 1