1

I'm using spark 1.4.1. When i'm trying to broadcast random forest model it shows me this error:

Traceback (most recent call last):
  File "/gpfs/haifa/home/d/a/davidbi/codeBook/Nice.py", line 358, in <module>
broadModel = sc.broadcast(model)
  File "/opt/apache/spark-1.4.1-bin-hadoop2.4_doop/python/lib/pyspark.zip/pyspark/context.py", line 698, in broadcast
  File "/opt/apache/spark-1.4.1-bin-hadoop2.4_doop/python/lib/pyspark.zip/pyspark/broadcast.py", line 70, in __init__
  File "/opt/apache/spark-1.4.1-bin-hadoop2.4_doop/python/lib/pyspark.zip/pyspark/broadcast.py", line 78, in dump
File "/opt/apache/spark-1.4.1-bin-hadoop2.4_doop/python/lib/pyspark.zip/pyspark/context.py", line 252, in __getnewargs__
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

example for the code i'm trying to execute:

sc = SparkContext(appName= "Something")
model = RandomForest.trainRegressor(sc.parallelize(data), categoricalFeaturesInfo=categorical, numTrees=100, featureSubsetStrategy="auto", impurity='variance', maxDepth=4)
broadModel= sc.broadcast(model)

If someone can help me with that i will be very thankful! Thanks a lot!

zero323
  • 322,348
  • 103
  • 959
  • 935
dadibiton
  • 13
  • 4
  • Is there a reason why you need to broadcast the whole model? The model can run predictions on RDDs of inputs. – Magsol Aug 18 '15 at 14:19
  • there is more than one model (in my case each model define group). each sample need to get prediction from each model to know which group he fir the most. i'm handling big data so i need to broadcast to models to the mapper. – dadibiton Aug 18 '15 at 14:23

1 Answers1

1

Short answer is it is not possible using PySpark. callJavaFunc which is required for prediction is using SparkContext hence the error. It would be possible to do something like this using Scala API though.

In Python you can use the same same approach as for a single model it means model.predict followed by zip.

models = [mode1, mode2, mode3]

predictions = [
    model.predict(testData.map(lambda x: x.features)) for model in models]

def flatten(x):
    if isinstance(x[0], tuple):
        return tuple(list(x[0]) + [x[1]])
    else:
        return x

(testData
   .map(lambda lp: lp.label)
   .zip(reduce(lambda p1, p2: p1.zip(p2).map(flatten), predictions)))

If want to know more about the source of the problem please check: How to use Java/Scala function from an action or a transformation?

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935