I'm trying to learn streaming data and manipulating it with the telecom churn dataset provided here. I've written a method to calculate this in batch:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD, LogisticRegressionWithLBFGS, LogisticRegressionModel, NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
object batchChurn{
def main(args: Array[String]): Unit = {
//setting spark context
val conf = new SparkConf().setAppName("churn")
val sc = new SparkContext(conf)
//loading and mapping data into RDD
val csv = sc.textFile("file://filename.csv")
val data = csv.map {line =>
val parts = line.split(",").map(_.trim)
val stringvec = Array(parts(1)) ++ parts.slice(4,20)
val label = parts(20).toDouble
val vec = stringvec.map(_.toDouble)
LabeledPoint(label, Vectors.dense(vec))
}
val splits = data.randomSplit(Array(0.7,0.3))
val (training, testing) = (splits(0),splits(1))
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 6
val featureSubsetStrategy = "auto"
val impurity = "gini"
val maxDepth = 7
val maxBins = 32
val model = RandomForest.trainClassifier(training, numClasses, categoricalFeaturesInfo,numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
val labelAndPreds = testing.map {point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
}
}
I've had no problems with this. Now, I looked at the NetworkWordCount example provided on the spark website, and changed the code slightly to see how it would behave.
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("127.0.0.1", 9999)
val data = lines.flatMap(_.split(","))
My question is: is it possible to convert this DStream to an array which I can input into my analysis code? Currently when I try to convert to Array using val data = lines.flatMap(_.split(","))
, it clearly says that:error: value toArray is not a member of org.apache.spark.streaming.dstream.DStream[String]