0

I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:

  1. Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
  1. Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)

  2. For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.

  3. Pick the last 30 days of bytes from this ordered set.

  4. Find the moving average of bytes for the last date using a time period of 30.

  5. Find the standard deviation of the bytes for the final date using a time period of 30.

  6. Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]

I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.

Here is the data structure for the dataset.

package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}

The DeviceAnalyzer class looks like this:

package com.testing
import com.testing.DeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Statistics Analyzer")
    val sc = new SparkContext(sparkConf)
    val logFile = args(0)
    val deviceAggregateLogs = sc.textFile(logFile).map(DeviceAggregates.parseLogLine).cache()
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    deviceIdsMap.foreach(
         // I am stuck here !!
    })
    sc.stop()
  }
}

But I am stuck with the actual implementation of this algorithm beyond this point.

Anupam Bagchi
  • 59
  • 1
  • 7
  • What's the sample rate? Will the data for one day fit into memory? Unless you have an extremely high sample rate, I don't see why not. So I would suggest just implementing the aggregation in normal scala and write some tests for it to make sure it does what you want. – Rüdiger Klaehn Jul 09 '15 at 07:18
  • 4
    I can't see that you're really stuck yet. the next step is "For each device, sort the set by eventdate.". What's stopping you doing that step? I think you need to show more evidence of attempting this, or it just looks like you want us to write your code – The Archetypal Paul Jul 09 '15 at 07:43
  • You should make the question more focused. I don't need to know every detail of the project to help you. Just ask the specific question you need help with. (Like Paul says.) – Daniel Darabos Jul 09 '15 at 09:56
  • The data for one day will fit in memory now. Ultimately this will run on a big cluster so that can be tackled when the code starts working. Where I am stuck is the Scala syntax. I'll try to get some hints from http://stackoverflow.com/questions/23402303/apache-spark-moving-average and proceed today. – Anupam Bagchi Jul 09 '15 at 14:47

1 Answers1

0

I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:

import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      println(allaggregates)
      val sortedAggregates = Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
      println(sortedAggregates) // This does not work - returns an empty array !!

      val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray  // This should be sortedAggregates.map (but does not compile)
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
    })

    sc.stop()
  }
}

I would really appreciate if someone can suggests improvements for the following:

  1. The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
  2. I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
  3. For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
  4. At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
Anupam Bagchi
  • 59
  • 1
  • 7