1

I have a large csv file with the columns id,time,location. I made it an RDD, and want to compute some aggregated metrics of the trips, when a trip is defined as a time-contiguous set of records of the same id, separated by at least 1 hour on either side. I am new to spark. (related)

To do that, I think to create an RDD with elements of the form (trip_id,(time, location)) and use reduceByKey to calculate all the needed metrics.

To calculate the trip_id, i try to implement the SQL-approach of the linked question, to make an indicator field of whether the record is a start of a trip, and make a cumulative sum of this indicator field. This does not sound like a distributed approach: is there a better one?

Furthermore, how can I add this indicator field? it should be 1 if the time-difference to the previous record of the same id is above an hour, and 0 otherwise. I thought of at first doing groupBy id and then sort in each of the values, but they will be inside an Array and thus not amenable to sortByKey, and there is no lead function as in SQL to get the previous value.

Example of the suggested aforementioned approach: for the RDD

(1,9:30,50)
(1,9:37,70)
(1,9:50,80)
(2,19:30,10)
(1,20:50,20)

We want to turn it first into the RDD with the time differences,

(1,9:30,50,inf)
(1,9:37,70,00:07:00)
(1,9:50,80,00:13:00)
(2,19:30,10,inf)
(2,20:50,20,01:20:00)

(The value of the earliest record is, say, scala's PositiveInfinity constant)

and turn this last field into an indicator field of whether it is above 1, which indicates whether we start a trip,

(1,9:30,50,1)
(1,9:37,70,0)
(1,9:50,80,0)
(2,19:30,10,1)
(2,20:50,20,1)

and then turn it into a trip_id

(1,9:30,50,1)
(1,9:37,70,1)
(1,9:50,80,1)
(2,19:30,10,2)
(2,20:50,20,3)

and then use this trip_id as the key to aggregations.

The preprocessing was simply to load the file and delete the header,

val rawdata=sc.textFile("some_path")
def isHeader(line:String)=line.contains("id")
val data=rawdata.filter(!isHeader(_))

Edit

While trying to implement with spark SQL, I ran into an error regarding the time difference:

val lags=sqlContext.sql("
  select time - lag(time) over (partition by id order by time) as diff_time from data
");

since spark doesn't know how to take the difference between two timestamps. I'm trying to check whether this difference is above 1 hour.

It Also doesn't recognize the function getTime, that I found online as an answer, the following returns an error too (Couldn't find window function time.getTime):

val lags=sqlContext.sql("
  select time.getTime() - (lag(time)).getTime() over (partition by id order by time) 
  from data
");

Even though making a similar lag difference for a numeric attribute works:

val lag_numeric=sqlContext.sql("
select longitude - lag(longitude) over (partition by id order by time) 
from data");  //works

Spark didn't recognize the function Hours.hoursBetween either. I'm using spark 1.4.0.

I also tried to define an appropriate user-defined-function, but UDFS are oddly not recognized inside queries:

val timestamp_diff: ((Timestamp,Timestamp) => Double) =
    (d1: Timestamp,d2: Timestamp) => d1.getTime()-d2.getTime()
val lags=sqlContext.sql("select timestamp_diff(time,lag(time)) 
over (partition by id order by time) from data");

So, how can spark test whether the difference between timestamps is above an hour?

Full code:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import sqlContext._
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.hive.HiveContext//For window functions
import java.util.Date
import java.sql.Timestamp

case class Record(id: Int, time:Timestamp, longitude: Double, latitude: Double)
val raw_data=sc.textFile("file:///home/sygale/merged_table.csv")

val data_records=
                    raw_data.map(line=>
                           Record( line.split(',')(0).toInt,
                                   Timestamp.valueOf(line.split(',')(1)),
                                   line.split(',')(2).toDouble,
                                   line.split(',')(3).toDouble 
                                  ))                                                                                     

val data=data_records.toDF()
data.registerTempTable("data")
val lags=sqlContext.sql("
  select time - lag(time) over (partition by id order by time) as diff_time from data
");
Community
  • 1
  • 1
Keldeo
  • 173
  • 1
  • 10
  • Spark supports window functions since 1.4 and there are quite a few similar questions on SO. – zero323 Oct 07 '15 at 12:15
  • Possible duplicate of [Mapping timeseries data to previous datapoints and averages](http://stackoverflow.com/questions/32526328/mapping-timeseries-data-to-previous-datapoints-and-averages) – zero323 Oct 07 '15 at 12:19
  • But I'm not using Spark SQL. I'm not dealing with tables here, but with tuples. (That only represent rows logically) – Emolga Oct 07 '15 at 12:20
  • You can covert to data frame, use window functions, extract rdd. And some other alternatives: http://stackoverflow.com/a/31538829/1560062 – zero323 Oct 07 '15 at 12:22
  • Thank you. What about efficiency? Are the HiveContext window functions as fast as working programmatically with tuples, like in your 'alternative' link? Also, I didn't succeed in using the lag from the duplicate question. Added details in the question above. – Keldeo Oct 14 '15 at 11:27

0 Answers0