I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
- Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate 15590657,246620,20150630 14066921,1907,20150621 14066921,1906,20150626 6522013,2349,20150626 6522013,2525,20150613
Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.
Pick the last 30 days of bytes from this ordered set.
Find the moving average of bytes for the last date using a time period of 30.
Find the standard deviation of the bytes for the final date using a time period of 30.
Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.
Here is the data structure for the dataset.
package com.testing
case class DeviceAggregates (
device_id: Integer,
bytes: Long,
eventdate: Integer
) extends Ordered[DailyDeviceAggregates] {
def compare(that: DailyDeviceAggregates): Int = {
eventdate - that.eventdate
}
}
object DeviceAggregates {
def parseLogLine(logline: String): DailyDeviceAggregates = {
val c = logline.split(",")
DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
}
}
The DeviceAnalyzer class looks like this:
package com.testing
import com.testing.DeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Sorting
object DeviceAnalyzer {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Statistics Analyzer")
val sc = new SparkContext(sparkConf)
val logFile = args(0)
val deviceAggregateLogs = sc.textFile(logFile).map(DeviceAggregates.parseLogLine).cache()
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
deviceIdsMap.foreach(
// I am stuck here !!
})
sc.stop()
}
}
But I am stuck with the actual implementation of this algorithm beyond this point.