0

It is evident that the out of box join capability in spark streaming does not warrent a lot of real life use cases. The reason being it joins only the data contained in the micro batch RDDs.

Use case is to join data from two kafka streams and enrich each object in stream1 with it's corresponding object in stream2 in spark and save it to HBase.

Implementation would

  • maintain a dataset in memory from objects from stream2, adding or replacing objects as and when they are recieved

  • for every element in stream1, access the cache to find a matching object from stream2, save to HBase if match is found or put it back on the kafka stream if not.

This question is on exploration of Spark streaming and it's API to find a way to implement the above mentioned.

2 Answers2

1

You can join the incoming RDDs to other RDDs -- not just the ones in that micro-batch. Basically you keep a "running total" RDD that you fill something like:

var globalRDD1: RDD[...] = sc.emptyRDD
var globalRDD2: RDD[...] = sc.emptyRDD

dstream1.foreachRDD(rdd => if (!rdd.isEmpty) globalRDD1 = globalRDD1.union(rdd))
dstream2.foreachRDD(rdd => if (!rdd.isEmpty) {
  globalRDD2 = globalRDD2.union(rdd))
  globalRDD1.join(globalRDD2).foreach(...) // etc, etc
}
David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • Thanks for your response. What you have mentioned would keep the "cache" RDD up to date. However it would be expensive to keep broadcasting the RDD everytime. Any thoughts on that? Could there be a different approach? – user3840810 Apr 21 '16 at 06:30
  • What do you mean broadcast? There's no `broadcast` in my code. – David Griffin Apr 21 '16 at 10:50
  • Should have used a different word for it, did not mean broadcast as in the "Shared Variable". When you do globalRDD1.join(globalRDD2) everytime, you send all the data from the driver to executors - even that data which has been sent earlier during previous microbatches, and possibly already processed. Again thanks for your response. – user3840810 Apr 21 '16 at 11:12
  • No, I don't believe that's how it works. Otherwise, a `union` operator across two big `RDDs` would be very expensive, and it's not (not as expensive as shipping all the data back and forth to the driver). – David Griffin Apr 21 '16 at 11:15
  • Here, check out this answer: "union is a very efficient operation, because it doesn't move any data around" http://stackoverflow.com/questions/29977526/in-apache-spark-why-does-rdd-union-does-not-preserve-partitioner/29978189#29978189 – David Griffin Apr 21 '16 at 13:53
  • Good point. Can you also comment on the join operation being repeated for elements for which match is found in previous micro batches? Is idempotancy tracked somehow? – user3840810 Apr 25 '16 at 06:18
0

A good start would be to look into mapWithState. This is a more efficient replacement for updateStateByKey. These are defined on PairDStreamFunction, so assuming your objects of type V in stream2 are identified by some key of type K, your first point would go like this:

def stream2: DStream[(K, V)] = ???

def maintainStream2Objects(key: K, value: Option[V], state: State[V]): (K, V) = {
  value.foreach(state.update(_))
  (key, state.get())
}

val spec = StateSpec.function(maintainStream2Objects)

val stream2State = stream2.mapWithState(spec)

stream2State is now a stream where each batch contains the (K, V) pairs with the latest value seen for each key. You can do a join on this stream and stream1 to perform the further logic for your second point.

sgvd
  • 3,819
  • 18
  • 31