2

I have a .tsv file pageviews_by_second consisting of the timestamp site and requestsfields:

"timestamp"              "site"   "requests"
"2015-03-16T00:09:55"   "mobile"    1595
"2015-03-16T00:10:39"   "mobile"    1544
"2015-03-16T00:19:39"   "desktop"   2460

I want the first row to be gone, because it leads to errors in the operations I have to perform on the data.

I tried doing it in the following ways:

1.Filtering the RDD before splitting it

val RDD1 = sc.textFile("pageviews_by_second")       
val top_row = RDD1.first() 
//returns: top_row: String = "timestamp"    "site"  "requests"    
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() 
//returns: "2015-03-16T00:09:55"    "mobile"    1595

2.Filtering the RDD after splitting it

val RDD1 = sc.textFile("pageviews_by_second").map(_.split("\t")
RDD1.first()  //returns res0: Array[String] = Array("timestamp, 'site", "requests")
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
val RDD2 = RDD1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests")
 RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")

3.Converting into a DataFrame using 'case class' and the filtering it

case class Wiki(timestamp: String, site: String, requests: String)
val DF = sc.textFile("pageviews_by_second").map(_.split("\t")).map(w => Wiki(w(0), w(1), w(2))).toDF()
val top_row = DF.first()
//returns: top_row: org.apache.spark.sql.Row = ["timestamp","site","requests"]
DF.filter(_ => _ != top_row)
//returns: error: missing parameter type
val DF2 = DF.filter(_ => _ != top_row2)

Why is only the 1st method able to filter out the first row while the other two aren't ? In method 3, why do I get the error and how can I rectify it ?

Raviteja
  • 3,399
  • 23
  • 42
  • 69
Shailesh
  • 2,116
  • 4
  • 28
  • 48

4 Answers4

3

You first need to understand the data types that you are comparing while removing the top row.

Comparing two strings will yield true or false in method 1. hence it filters out the top row

In method 2 you are comparing 2 Arrays.Use deep method of array to do deeper comparison of arrays in scala

Method2
val RDD1 = sc.textFile("D:\\trial.txt").map(_.split("\t"))
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x.deep!= top_row.deep)
RDD2.first().foreach(println(_))

In method 3 you are comparing two rows object of dataframe. It would be better if you convert row to toSeq followed by toArray and then use deep method to filter out first row of dataframe.

//Method 3    
DF.filter(_ => _.toSeq.toArray.deep!=top_row.toSeq.toArray.deep)

Revert if it helps. Thanks!!!

nareshbabral
  • 821
  • 1
  • 10
  • 19
  • Thank you for your answer. Is there another method to do the comparison instead of using `deep` ? Like some index comparison ? – Shailesh Feb 08 '16 at 13:13
  • 1
    http://stackoverflow.com/questions/5393243/how-do-i-compare-two-arrays-in-scala this might help – nareshbabral Feb 08 '16 at 13:16
2

First of all, you really should use the spark-csv-package - it can automatically filter out headers when creating the DataFrame (or rdd). You simply have to specify that :)

Secondly, rdds are not ordered in the way that you seem to think they are. Calling first is not guaranteed to return the first row of your csv-file. It your first scenario, apparently you did get the first row, but it's better if you just considered yourself lucky in that case. Also, removing a header like this from a potentially VERY large data set is very inefficient, as Spark will need to search through all the rows to filter out just a single row.

If ordering is important to you for further calculations, you can always do a zipWithIndex. That way, you can then sort the rdd to preserve ordering.

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
  • Thanks Glennie, I understand that the `first()` method isn't necessarily the first and I just got lucky there and I didn't want to use the `spark-csv` package because I am just learning and wanted to code things from the ground-up. But how does the use of `spark-csv` not affect the efficiency while using a `filter()` does ? Under the hood, the `spark-csv` must do a similar operation, isn't it ? – Shailesh Feb 08 '16 at 13:22
  • 1
    I haven't read the code, but I magine that `spark-csv` simply omits the first line, when it pipes the data into the `rdd` :) Also, I'm not sure I quite understand _code things from the ground-up_ - learning Spark is, IMHO, all about learning the Spark libraries and how to use them. – Glennie Helles Sindholt Feb 08 '16 at 14:27
  • 1
    BTW, using spark-csv simply means that you write `sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(myFile)` rather than `sc.textFile(myFile).map(_.split("\t"))` – Glennie Helles Sindholt Feb 08 '16 at 14:37
  • Thank you for all your help – Shailesh Feb 08 '16 at 14:41
2

There is way to remove header, instead of deep comparison:

data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(lambda x:x !=header) #filter out header

Might be relevant to you: How do I skip a header from CSV files in Spark?

Community
  • 1
  • 1
purplebee
  • 61
  • 6
0

I found another method which would be more efficient than the filter method that I've used. Posting it as an answer as someone else may find it helpful:

rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }

Source: https://stackoverflow.com/a/27857878/3546389

Community
  • 1
  • 1
Shailesh
  • 2,116
  • 4
  • 28
  • 48