3

I am new to Spark and I’m having difficulties wrapping my mind around this way of thinking. The following problems seem generic, but I have no idea how I can solve them using Spark and the memory of its nodes only.

I have two lists (i.e.: RDDs):

  1. List1 - (id, start_time, value) where the tuple (id, start_time) is unique
  2. List2 - (id, timestamp)

First problem: go over List2 and for each (id, timestamp) find in List1 a value that has the same id and the maximal start_time that is before the timestamp.

For example:

List1:
 (1, 10:00, a)
 (1, 10:05, b)
 (1, 10:30, c)
 (2, 10:02, d)

List2:
 (1, 10:02)
 (1, 10:29)
 (2, 10:03)
 (2: 10:04)

Result:
 (1, 10:02) => a
 (1, 10:29) => b
 (2, 10:03) => d
 (2: 10:04) => d

Second problem: very similar to the first problem, but now the start_time and timestamp are fuzzy. This means that a time t may be anywhere between (t - delta) and (t + delta). Again, I need to time join the lists.

Notes:

  1. There is a solution to the first problem using Cassandra, but I'm interested in solving it using Spark and the memory of the nodes only.
  2. List1 has thousands of entries.
  3. List2 has tens of millions of entries.
Community
  • 1
  • 1
Dror B.
  • 83
  • 1
  • 5

1 Answers1

2

For brevity I have converted your time data 10:02 to decimal data 10.02. just use a function that would convert the time string to a number.

The first problem can be easily solved using SparkSQL as shown below.

val list1 = spark.sparkContext.parallelize(Seq(
(1, 10.00, "a"),
(1, 10.05, "b"),
(1, 10.30, "c"),
(2, 10.02, "d"))).toDF("col1", "col2", "col3")

val list2 = spark.sparkContext.parallelize(Seq(
(1, 10.02),
(1, 10.29),
(2, 10.03),
(2, 10.04)
)).toDF("col1", "col2")

list1.createOrReplaceTempView("table1")

list2.createOrReplaceTempView("table2")


scala> spark.sql("""
     | SELECT col1,col2,col3
     | FROM
     | (SELECT
     | t2.col1, t2.col2, t1.col3,
     | ROW_NUMBER() over(PARTITION BY t2.col1, t2.col2 ORDER BY t1.col2 DESC) as rank
     | FROM table2 t2
     | LEFT JOIN table1 t1
     | ON t1.col1 = t2.col1
     | AND t2.col2 > t1.col2) tmp
     | WHERE tmp.rank = 1""").show()
+----+-----+----+
|col1| col2|col3|
+----+-----+----+
|   1|10.02|   a|
|   1|10.29|   b|
|   2|10.03|   d|
|   2|10.04|   d|
+----+-----+----+

similarly the solution for the 2'nd problem can be derived by just changing the joining condition as shown below

spark.sql("""
SELECT col1,col2,col3
FROM
(SELECT
t2.col1, t2.col2, t1.col3, 
ROW_NUMBER() over(PARTITION BY t2.col1, t2.col2 ORDER BY t1.col2 DESC) as rank
FROM table2 t2
LEFT JOIN table1 t1 
ON t1.col1 = t2.col1
AND t2.col2 between t1.col2 - ${delta} and t1.col2 + ${delta} ) tmp // replace delta with actual value
WHERE tmp.rank = 1""").show()
rogue-one
  • 11,259
  • 7
  • 53
  • 75
  • Excellent! Minor fix: RANK() should be used instead of ROW_NUMBER() in case there are identical rows in table2 – Dror B. May 23 '17 at 12:47