3

I am trying to perform a join between a Dstream and a static RDD.

PySpark

  #Create static data
    ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_rdd_broadcast = sc.broadcast(ip_classification_rdd)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_rdd[log_name]))

I get this exception: "It appears that you are attempting to broadcast an RDD or reference an RDD from an "

Scala

However, someone has the same requirement here: How to join a DStream with a non-stream file?

And this was the solution:

val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}

What is the equivalent for this in Pyspark?

steven
  • 644
  • 1
  • 11
  • 23

1 Answers1

0

A couple of changes are required in your code:

  • You can not broadcast an RDD: instead do it on the underlying "data":
  • You then obtain the broadcast variable inside the closure using the value() method

Here is an approximation of what your updated code might look like:

 #Create static data
    data = [('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_broadcast = sc.broadcast(data)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd:  \
        rdd.join(ip_classification_broadcast.value().get[1]))
WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560
  • Snippet of the error log: rdd.join(ip_classification_broadcast.value().get()[log_name])) TypeError: 'list' object is not callable – steven Aug 17 '18 at 12:39
  • Looks like `value` is an attribute not a method - so i've updated the code above: try `.get[log_name]` instead of `get()[log_name]`. You will need to tweak these details a bit since I don't have your code and test set up. – WestCoastProjects Aug 17 '18 at 12:55
  • To debug, i have run the content of the join function. `ip_classification_broadcast.value().get[log_name]` results in: `TypeError: 'list' object is not callable` This: `ip_classification_broadcast.value[0]` results in: `('log_name', 'enrichment_success')` How ever when I run the spark-submit, this is the error: `AttributeError: 'tuple' object has no attribute 'mapValues'` – steven Aug 17 '18 at 13:09
  • 1
    Ah - that means we're close! You might need to restructure the `data` so that it is a `dict` not a `tuple`. But before doing that - by using the current `tuple`: the following should compile and give you `enrichment_success` as the output: `ip_classification_broadcast.value().get[1]` . Notice the `[1]` that accesses the second element of the `tuple`. Later on you'll want to be able to access via the key `log_name`: that will require restructuring. – WestCoastProjects Aug 17 '18 at 13:51
  • CASE 1: `ip_classification_broadcast = sc.broadcast(data) joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_broadcast.value[0]))` ERROR AttributeError: 'tuple' object has no attribute 'mapValues' – steven Aug 18 '18 at 11:09
  • CASE2: `ip_classification_broadcast = sc.broadcast(data) joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_broadcast.value[0][0]))` ERROR AttributeError: 'str' object has no attribute 'mapValues' – steven Aug 18 '18 at 11:10
  • CASE3: `ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')])` joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_broadcast)) COMPILES BUT NO OUTPUT on print – steven Aug 18 '18 at 11:11