import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.tree.impurity.Entropy
object ScalaApp {
def main(args: Array[String]) {
object ScalaApp {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("Program")
val sc = new SparkContext(conf)
val rawData = sc.textFile("/home/sangeen/Kaggle/train.tsv")
val records = rawData.map(line => line.split("\t"))
records.first
println(records.first)
/*
we will have to do a bit of data cleaning during
our initial processing by trimming out the extra quotation characters ("). There are
also missing values in the dataset; they are denoted by the "?" character. In this case,
we will simply assign a zero value to these missing values:
*/
val data = records.map { r => val trimmed = r.map (_.replaceAll("/"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size -1).map(d => if (d == "?")) 0.0 else d.toDouble)
LabeledPoint(label, Vectors.dense(features))}
/*
In the preceding code, we extracted the label variable from the last column and an
array of features for columns 5 to 25 after cleaning and dealing with missing values.
We converted the label to an Int value and the features to an Array[Double].
Finally, we wrapped the label and features in a LabeledPoint instance, converting
the features into an MLlib Vector.
We will also cache the data and count the number of data points:
You will see that the value of numData is 7395.
*/
data.cache
val numData = data.count
println("value of numData is : " + numData)
/*
We will explore the dataset in more detail a little later, but we will tell you now
that there are some negative feature values in the numeric data. As we saw earlier,
the naïve Bayes model requires non-negative features and will throw an error if it
encounters negative values. So, for now, we will create a version of our input feature
vectors for the naïve Bayes model by setting any negative feature values to zero:
*/
val nbData = records.map { r =>
val trimmed = r.map(_.replaceAll("\"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d)
LabeledPoint(label, Vectors.dense(features))}
val numIterations = 10
val maxTreeDepth = 5
//Now, train each model in turn. First, we will train logistic regression:
val lrModel = LogisticRegressionWithSGD.train(data, numIterations)
}
}
The code gives me erros :
[error] (run-main-1) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NumberFormatException: For input string: ",urlid,boilerplate,alchemy_category,alchemy_category_score,avglinksize,commonlinkratio_1,commonlinkratio_2,commonlinkratio_3,commonlinkratio_4,compression_ratio,embed_ratio,framebased,frameTagRatio,hasDomainLink,html_ratio,image_ratio,is_news,lengthyLinkDomain,linkwordscore,news_front_page,non_markup_alphanum_characters,numberOfLinks,numwords_in_url,parametrizedLinkRatio,spelling_errors_ratio,label"
[error] at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
[error] at java.lang.Integer.parseInt(Integer.java:481)
[error] at java.lang.Integer.parseInt(Integer.java:527)
[error] at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
[error] at scala.collection.immutable.StringOps.toInt(StringOps.scala:30)
[error] at ScalaApp$$anonfun$4.apply(Program.scala:29)
[error] at ScalaApp$$anonfun$4.apply(Program.scala:27)
[error] at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
[error] at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
[error] at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
[error] at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
[error] at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
[error] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
[error] at org.apache.spark.scheduler.Task.run(Task.scala:88)
[error] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[error] at java.lang.Thread.run(Thread.java:745)
[error] Driver stacktrace
[error] (compile:run) Nonzero exit code: 1