3

As a follow-up to my previous question, how do I map over an RDD locally, i.e., collect the data into a local stream without actually using collect (because the data is far too large).

Specifically, I want to write something like

from subprocess import Popen, PIPE
with open('out','w') as out:
    with open('err','w') as err:
        myproc = Popen([.....],stdin=PIPE,stdout=out,stderr=err)
myrdd.iterate_locally(lambda x: myproc.stdin.write(x+'\n'))

How do I implement this iterate_locally?

  • does NOT work: collect return value is far too large:

    myrdd.collect().foreach(lambda x: myproc.stdin.write(x+'\n'))

  • does NOT work: foreach executes its argument in a distributed mode, NOT locally

    myrdd.foreach(lambda x: myproc.stdin.write(x+'\n'))

Related:

Community
  • 1
  • 1
sds
  • 58,617
  • 29
  • 161
  • 278

2 Answers2

1

What about RDD.foreachPartition? You can work on the data in batches, like this:

myRdd.foreachPartition(it => it.collect.foreach(...))

If you look at the feature request history, RDD.foreachPartition was created to straddle this middle ground.

David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • is it guaranteed to process _sorted_ RDD _in order_? – sds May 27 '15 at 17:27
  • I believe so -- believe this is the basis of `RDD.zipWithIndex`, which lets you create the equivalent of an AUTO INCREMENT index of type Long on your RDD. And I know `RDD.zipWithIndex` preserves the original ordering of the RDD. – David Griffin May 27 '15 at 17:35
  • 1
    I get `pickle.PicklingError: Cannot pickle files that are not opened for reading`: apparently, `foreachPartition` is still executed in a distributed mode, so `myproc.stdin` has to be picked and shipped (and it is obviously impossible). – sds May 27 '15 at 18:20
  • Hmm. I guess that makes sense. Sorry about that. – David Griffin May 27 '15 at 18:33
1

Your best option is likely to save the data to a source that your local machine can access and then iterate over that.

If that isn't an option, and assuming your local machine can handle one partitions worth of data at a time, is you can selectively bring back one partition at a time (I'd cache the data first) and then do something along the lines of:

rdd.cache()
for partition in range(0, rdd.numPartitions):
  data = rdd.mapPartitionsWithIndex(lambda index, itr: [(index, list(itr))]
  localData = data.filter(lambda x: x[0] == partition).collect
  # Do worker here
Holden
  • 7,392
  • 1
  • 27
  • 33