5

I am trying to multiprocess a list of RDDs as follows

from pyspark.context import SparkContext
from multiprocessing import Pool



def square(rdd_list):
    def _square(i):
        return i*i
    return rdd_list.map(_square)

sc = SparkContext('local', 'Data_Split')
data = sc.parallelize([1,2,3,4,5,6])

dataCollection = [data, data, data]

p = Pool(processes=2)
result = p.map(square, dataCollection)
print result[0].collect()

I am expecting a list of RDDs in output with each element containing the squared elements from data.

But running the code results in the following error :

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.coun\ t() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

My questions are :-

1) Why doesn't the code work as expected ? How can I fix this ?

2) Will I gain any performance enhancement (in terms of lessening the runtime) for my program if I use the p.map (Pool) instead of a simple map on my RDD list.

Kristian
  • 21,204
  • 19
  • 101
  • 176
srjit
  • 526
  • 11
  • 25
  • Did you try using the multiprocessing library? – arshpreet Jul 14 '16 at 09:42
  • 1
    1) Because you don't deal with plain Python state. Use threading http://stackoverflow.com/q/38048068/1560062 (and no, GIL is not an issue here) 2) Unless you micromanage resources probably not but without a real context this is just guessing. – zero323 Jul 14 '16 at 09:47
  • Could you elaborate a bit / provide references to 'Plain Python state' ? Are you talking about " side effects " when you say plain python state (specific to the code in the question)? – srjit Jul 14 '16 at 10:06
  • 1
    I mean that PySpark "driver" is just a small client for Java, not a standalone driver. So it is not so much Python objects as JVM and Py4J gateway. Regarding threads there are only to achieve non blocking submission and to parallelize some secondary finalizing tasks. These won't touch core processing. – zero323 Jul 14 '16 at 14:40

1 Answers1

2

It's because when you use multi-process, the RDD has to be serialized/pickled before sending to the other processes. Spark performs a check whenever an attempt to serialized an RDD is made, and throw that error.

Kien Truong
  • 11,179
  • 2
  • 30
  • 36
  • So, is there a way I can serialize the RDD programmatically and submit to the Pool ? Also, in your opinion, is this a better approach rather than using Threads (in terms of runtime) ? – srjit Jul 14 '16 at 11:43
  • 1
    I don't see the point of using multi-process or multi-thread in Spark driver' code. In most case you can partition the data to saturate the CPU resources with just 1 RDD execution. In that case, it doesn't matter if two RDD is executed sequentially or in parallel, the total run time will be the same. – Kien Truong Jul 14 '16 at 11:59
  • To add some more context, I was trying to state the actual problem I am trying to solve, in a simple way. I was building n classifiers for a one vs all classification using LR with LBFGS optimization, each classifier taking rdds from a list of RDDs. I was wondering if two models could be built simultaneously (if 2 cores are provided for the pool), provided all elements in the list of RDDs are independent of each other. – srjit Jul 14 '16 at 16:31
  • 1
    I have to wonder if you're really benefitting from doing it that way. If you just were to just leverage the regular automatic implementation of parallelization that spark would have done anyway be just writing your code in a one-thing-at-a-time format, you'll free yourself from a lot of responsibility. – Kristian Jul 14 '16 at 17:25
  • This is exactly what my doubt was. :) My classifier is a queue prediction model. I have observed that, when the number of classes in the target increases, the time required to build all the models increases exponentially. I was looking for ways to reduce the time required (may be by building two models simultaneously, if it worked that way). – srjit Jul 14 '16 at 17:42