27
Class ProdsTransformer:

    def __init__(self):  
      self.products_lookup_hmap = {}
      self.broadcast_products_lookup_map = None

    def create_broadcast_variables(self):
      self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap)

    def create_lookup_maps(self):
    // The code here builds the hashmap that maps Prod_ID to another space.

pt = ProdsTransformer ()
pt.create_broadcast_variables()  

pairs = distinct_users_projected.map(lambda x: (x.user_id,    
                         pt.broadcast_products_lookup_map.value[x.Prod_ID]))

I get the following error:

"Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063."

Any help with how to deal with the broadcast variables will be great!

Kyr
  • 5,383
  • 2
  • 27
  • 22
user3803714
  • 5,269
  • 10
  • 42
  • 61
  • That's not enough code and/or sample data for someone to try to duplicate the error and/or fix it. Also, in case you didn't notice, all the indentation is stripped out of the python. – Paul Jul 20 '15 at 04:33
  • I have added more code. – user3803714 Jul 20 '15 at 05:03
  • I wonder if the error would go away if you moved the `products_lookup_map` out of the properties of `ProdsTransformer` instances and instead made it a global. Do you need more than one map? – Paul Jul 20 '15 at 05:12
  • I need multiple maps. – user3803714 Jul 20 '15 at 05:14
  • 3
    OK, I think what the error means is that calling `sc` or rdd functions is forbidden within the workers, i.e. in any spark function like `map()`, `flatmap()`, `reduce()`, etc.... You can only call `sc.something` in the main program. So, for instance, you can chain maps, but you can't have a map within a map. And apparently broadcast is under that kind of restriction. I've seen spark mangle custom classes, and so there is some recreation of class instances on workers from serialized data that is being moved around. – Paul Jul 20 '15 at 05:16

1 Answers1

29

By referencing the object containing your broadcast variable in your map lambda, Spark will attempt to serialize the whole object and ship it to workers. Since the object contains a reference to the SparkContext, you get the error. Instead of this:

pairs = distinct_users_projected.map(lambda x: (x.user_id, pt.broadcast_products_lookup_map.value[x.Prod_ID]))

Try this:

bcast = pt.broadcast_products_lookup_map
pairs = distinct_users_projected.map(lambda x: (x.user_id, bcast.value[x.Prod_ID]))

The latter avoids the reference to the object (pt) so that Spark only needs to ship the broadcast variable.

user2303197
  • 1,271
  • 7
  • 10