1
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.tree.impurity.Entropy

object ScalaApp {
def main(args: Array[String]) {

   object ScalaApp {
   def main(args: Array[String]) {
   val conf = new SparkConf().setMaster("local").setAppName("Program")
   val sc = new SparkContext(conf)

   val rawData = sc.textFile("/home/sangeen/Kaggle/train.tsv")
   val records = rawData.map(line => line.split("\t"))
   records.first
   println(records.first)


/*
we will have to do a bit of data cleaning during
our initial processing by trimming out the extra quotation characters ("). There are
also missing values in the dataset; they are denoted by the "?" character. In this case,
we will simply assign a zero value to these missing values:
*/

val data = records.map { r => val trimmed = r.map (_.replaceAll("/"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size -1).map(d => if (d == "?")) 0.0 else d.toDouble)
LabeledPoint(label, Vectors.dense(features))}




/*
    In the preceding code, we extracted the label variable from the last column and an
array of features for columns 5 to 25 after cleaning and dealing with missing values.
We converted the label to an Int value and the features to an Array[Double].
Finally, we wrapped the label and features in a LabeledPoint instance, converting
the features into an MLlib Vector.
We will also cache the data and count the number of data points:

You will see that the value of numData is 7395.
*/

 data.cache
 val numData = data.count

 println("value of numData is : " + numData)

/*
We will explore the dataset in more detail a little later, but we will tell you now
that there are some negative feature values in the numeric data. As we saw earlier,
the naïve Bayes model requires non-negative features and will throw an error if it
encounters negative values. So, for now, we will create a version of our input feature
vectors for the naïve Bayes model by setting any negative feature values to zero:
*/

  val nbData = records.map { r =>
  val trimmed = r.map(_.replaceAll("\"", ""))
  val label = trimmed(r.size - 1).toInt
  val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0  else d.toDouble).map(d => if (d < 0) 0.0 else d)
  LabeledPoint(label, Vectors.dense(features))}

  val numIterations = 10
  val maxTreeDepth = 5

//Now, train each model in turn. First, we will train logistic regression:


 val lrModel = LogisticRegressionWithSGD.train(data, numIterations)

 }
 }

The code gives me erros :

  [error] (run-main-1) org.apache.spark.SparkException: Job aborted due to  stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost  task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NumberFormatException: For input string:  ",urlid,boilerplate,alchemy_category,alchemy_category_score,avglinksize,commonlinkratio_1,commonlinkratio_2,commonlinkratio_3,commonlinkratio_4,compression_ratio,embed_ratio,framebased,frameTagRatio,hasDomainLink,html_ratio,image_ratio,is_news,lengthyLinkDomain,linkwordscore,news_front_page,non_markup_alphanum_characters,numberOfLinks,numwords_in_url,parametrizedLinkRatio,spelling_errors_ratio,label"
  [error]   at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  [error]   at java.lang.Integer.parseInt(Integer.java:481)
  [error]   at java.lang.Integer.parseInt(Integer.java:527)
  [error]   at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
  [error]   at scala.collection.immutable.StringOps.toInt(StringOps.scala:30)
  [error]   at ScalaApp$$anonfun$4.apply(Program.scala:29)
  [error]   at ScalaApp$$anonfun$4.apply(Program.scala:27)
  [error]   at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
  [error]   at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
  [error]   at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  [error]   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  [error]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
  [error]   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  [error]   at org.apache.spark.scheduler.Task.run(Task.scala:88)
  [error]   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  [error]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  [error]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  [error]   at java.lang.Thread.run(Thread.java:745)
  [error] Driver stacktrace
  [error] (compile:run) Nonzero exit code: 1
tuxdna
  • 8,257
  • 4
  • 43
  • 61
Sangeen Khan
  • 175
  • 1
  • 20

3 Answers3

2

Your code is trying to convert the header columns into numbers, which are not numbers of-course. Just skip the first line and you are good to go:

val lst = List(1,2,3,4)
val records = sc.parallelize(lst).zipWithIndex.filter(_._2 > 0).map(_._1)
val records.collect() // Array[Int] = Array(2, 3, 4)

Or don't read the header line at all.

For more: How do I skip a header from CSV files in Spark?

Community
  • 1
  • 1
tuxdna
  • 8,257
  • 4
  • 43
  • 61
1

just before running the code first remove the header by the help of these steps

1) open terminal

Ctr + alt + t

2) go to the file directory

cd /home/sangeen/Programs/Classification 

3) just run that one line code :

sed 1d train.tsv > train_noheader.tsv 

so in directry a non-header tsv file will generate .

use the "train-noheader.tsv" file instead of "train.tsv".

for Example :

   val rawData = sc.textFile("/home/sangeen/Kaggle/train.tsv")

will become

    val rawData = sc.textFile("/home/sangeen/Kaggle/train-noheader.tsv")
Sangeen Khan
  • 175
  • 1
  • 20
0

Tuxdna is correct that header is the problem but the method provided by me to filter out the header will reduce the space and time complexity of the code.

val data = records.filter(_.contains("urlid,boilerplate,alchemy_category")).map { r => val trimmed = r.map (_.replaceAll("/"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size -1).map(d => if (d == "?")) 0.0 else d.toDouble)
LabeledPoint(label, Vectors.dense(features))}

val nbData = records.filter(_.contains("urlid,boilerplate,alchemy_category"))..map { r =>
val trimmed = r.map(_.replaceAll("\"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0  else d.toDouble).map(d => if (d < 0) 0.0 else d)
LabeledPoint(label, Vectors.dense(features))}
Kshitij Kulshrestha
  • 2,032
  • 1
  • 20
  • 27