14

I have a delicate Spark problem, where i just can't wrap my head around.

We have two RDDs ( coming from Cassandra ). RDD1 contains Actions and RDD2 contains Historic data. Both have an id on which they can be matched/joined. But the problem is the two tables have an N:N relation ship. Actions contains multiple rows with the same id and so does Historic. Here are some example date from both tables.

Actions time is actually a timestamp

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic set_at is actually a timestamp

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75

How can we join these two tables in a way, that we get a result like this

1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1   |  50   # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2   |  50   # 125 - 75  for Actions#3 with time 12:30 because H. was in that time at 75

I can't come up with a good solution that feels right, without making a lot of iterations over huge datasets. I always have to think about making a range from the Historic set and then somehow check if the Actions fits in the range e.g (11:00 - 12:15) to make the calculation. But that seems to pretty slow to me. Is there any more efficient way to do that? Seems to me, that this kind of problem could be popular, but i couldn't find any hints on this yet. How would you solve this problem in spark?

My current attempts so far ( in half way done code )

case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)

historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...) 
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))

// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple
Sean Owen
  • 66,182
  • 23
  • 141
  • 173
M. Hirn
  • 725
  • 7
  • 22
  • Could you add your current attempts to the question? – maasg Nov 25 '14 at 23:46
  • Thanks maasg for your respond, I added my current attempts which aren't far yet. I am a few days into spark and scala and it doesn't feel intuitive to me at all. My current thoughts on the solve are really elementary and i guess this is far from being performant. Anyway i would do it something like this, although i am not sure if it's working out. – M. Hirn Nov 26 '14 at 09:16
  • Your own effort is always appreciated. – maasg Nov 26 '14 at 10:10
  • I just figured out, that the last two transformations especially `someKindOfSort` won't work out. So, the first reduceByKey needs to do the Range storage directly, which is also way better, because less iterations are needed. I'll update the post. – M. Hirn Nov 26 '14 at 10:15
  • is this an event sequence? Meaning that given these two records: (1,11:00,400), (1,12:15,450) does it tell us that id1 had value 400 from 11:00-12:15 and value 450 from 12:15 onwards? Does your 'action' data have a base granularity? (like 5 minutes in the example?) – maasg Nov 26 '14 at 10:50
  • Yes, that's true. `Historic` shows an event sequence. The `Action` and `Historic` dates are actually timestamps, i just wrote it here like that so we - as humans - can read it more easily. – M. Hirn Nov 26 '14 at 11:03

3 Answers3

4

It's an interesting problem. I also spent some time figuring out an approach. This is what I came up with:

Given case classes for Action(id, time, x) and Historic(id, time, y)

  • Join the actions with the history (this might be heavy)
  • filter all historic data not relevant for a given action
  • key the results by (id,time) - differentiate same key at different times
  • reduce the history by action to the max value, leaving us with relevant historical record for the given action

In Spark:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

Using the data provided above, the report looks like:

report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(I transformed the time to seconds to have a simplistic timestamp)

maasg
  • 37,100
  • 11
  • 88
  • 115
  • Thanks maasg, for you're answer. You solution looks very clean and is the superior one, because mine doesn't scale with a lot of Historic Data. Great answer, I appreciated it. – M. Hirn Nov 27 '14 at 09:10
0

After a few hours of thinking, trying and failing I came up with this solution. I am not sure if it is any good, but due the lack of other options, this is my solution.

First we expand our case class Historic

case class Historic(id: String, set_at: Long, valueY: Int) {
  val set_at_map = new java.util.TreeMap[Long, Int]() // as it seems Scala doesn't provides something like this with similar operations we'll need a few lines later
  set_at_map.put(0, valueY) // Means from the beginning of Epoch ...
  set_at_map.put(set_at, valueY) // .. to the set_at date

  // This is the fun part. With .getHistoricValue we can pass any timestamp and we will get the a value of the key back that contains the passed date. For more information look at this answer: http://stackoverflow.com/a/13400317/1209327
  def getHistoricValue(date: Long) : Option[Int] = {
    var e = set_at_map.floorEntry(date)                                   
    if (e != null && e.getValue == null) {                                  
      e = set_at_map.lowerEntry(date)                                     
    }                                                                         
    if ( e == null ) None else e.getValue()
  }
}

The case class is ready and now we bring it into action

val historicRDD = sc.cassandraTable[Historic](...)
  .map( row => ( row.id, row ) )
  .reduceByKey( (row1, row2) =>  {
    row1.set_at_map.put(row2.set_at, row2.valueY) // we add the historic Events up to each id
    row1
  })

// Now we load the Actions and map it by id as we did with Historic
val actionsRDD = sc.cassandraTable[Actions](...)
  .map( row => ( row.id, row ) )

// Now both RDDs have the same key and we can join them
val fin = actionsRDD.join(historicRDD)
  .map( row => {
    ( row._1.id, 
      (
        row._2._1.id, 
        row._2._1.valueX - row._2._2.getHistoricValue(row._2._1.time).get // returns valueY for that timestamp
      )
    )
  })

I am totally new to Scala, so please let me know if we could improve this code on some place.

M. Hirn
  • 725
  • 7
  • 22
  • Interesting approach. Will it scale up when you have a lot of historical values by key? – maasg Nov 27 '14 at 00:34
  • I would not recommend this solution if you have a lot of historic data, because the lookups are of O(log(N)) complexity, ergo it does not scale. (Luckily I am not having many historic entries, so it works pretty well). Here are more informations about this http://stackoverflow.com/a/1314708/1209327 – M. Hirn Nov 27 '14 at 09:05
  • The TreeMap could get possibly replaced by [Guavas RangeMap](http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/RangeMap.html), which might provide faster and scaleable lookups. Although I couldn't find any performance details about it. – M. Hirn Nov 27 '14 at 09:15
0

I know that this question has been answered but I want to add another solution that worked for me -

your data -

Actions 
id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic 
id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75
  1. Union Actions and Historic
    Combined
    id  |  time  | valueX | record-type
    1   |  12:05 | 500    | Action
    1   |  12:30 | 500    | Action
    2   |  12:30 | 125    | Action
    1   |  11:00 | 400    | Historic
    1   |  12:15 | 450    | Historic
    2   |  12:20 | 50     | Historic
    2   |  12:25 | 75     | Historic
  1. Write a custom partitioner and use repartitionAndSortWithinPartitions to partition by id, but sort by time.

    Partition-1
    1   |  11:00 | 400    | Historic
    1   |  12:05 | 500    | Action
    1   |  12:15 | 450    | Historic
    1   |  12:30 | 500    | Action
    Partition-2
    2   |  12:20 | 50     | Historic
    2   |  12:25 | 75     | Historic
    2   |  12:30 | 125    | Action
    
  2. Traverse through the records per partition.

If it is a Historical record, add it to a map, or update the map if it already has that id - keep track of the latest valueY per id using a map per partition.

If it is a Action record, get the valueY value from the map and subtract it from valueX

A map M

Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75

You could try to partition and sort by time, but you need to merge the partitions later. And that could add to some complexity.

This, I found it preferable to the many-to-many join that we usually get when using time ranges to join.

Phani Rahul
  • 840
  • 7
  • 22