8

I have a dataset, which contains lines in the format (tab separated):

Title<\t>Text

Now for every word in Text, I want to create a (Word,Title) pair. For instance:

ABC      Hello World

gives me

(Hello, ABC)
(World, ABC)

Using Scala, I wrote the following:

val file = sc.textFile("s3n://file.txt")
val title = file.map(line => line.split("\t")(0))
val wordtitle = file.map(line => (line.split("\t")(1).split(" ").map(word => (word, line.split("\t")(0)))))

But this gives me the following output:

[Lscala.Tuple2;@2204b589
[Lscala.Tuple2;@632a46d1
[Lscala.Tuple2;@6c8f7633
[Lscala.Tuple2;@3e9945f3
[Lscala.Tuple2;@40bf74a0
[Lscala.Tuple2;@5981d595
[Lscala.Tuple2;@5aed571b
[Lscala.Tuple2;@13f1dc40
[Lscala.Tuple2;@6bb2f7fa
[Lscala.Tuple2;@32b67553
[Lscala.Tuple2;@68d0b627
[Lscala.Tuple2;@8493285

How do I solve this?

Further reading

What I want to achieve is to count the number of Words that occur in a Text for a particular Title.

The subsequent code that I have written is:

val wordcountperfile = file.map(line => (line.split("\t")(1).split(" ").flatMap(word => word), line.split("\t")(0))).map(word => (word, 1)).reduceByKey(_ + _)

But it does not work. Please feel free to give your inputs on this. Thanks!

AngryPanda
  • 1,261
  • 2
  • 19
  • 42

4 Answers4

13

So... In spark you work using distributed data structure called RDD. They provide functionality similar to scala's collection types.

val fileRdd = sc.textFile("s3n://file.txt")
// RDD[ String ]

val splitRdd = fileRdd.map( line => line.split("\t") )
// RDD[ Array[ String ]

val yourRdd = splitRdd.flatMap( arr => {
  val title = arr( 0 )
  val text = arr( 1 )
  val words = text.split( " " )
  words.map( word => ( word, title ) )
} )
// RDD[ ( String, String ) ]

// Now, if you want to print this...
yourRdd.foreach( { case ( word, title ) => println( s"{ $word, $title }" ) } )

// if you want to count ( this count is for non-unique words), 
val countRdd = yourRdd
  .groupBy( { case ( word, title ) => title } )  // group by title
  .map( { case ( title, iter ) => ( title, iter.size ) } ) // count for every title
sarveshseri
  • 13,738
  • 28
  • 47
  • Thanks for the quick post. However, it gives me an error or seq.length. Basically, I want to store the ((Word,Title), COUNT) in a file. – AngryPanda Apr 23 '15 at 10:52
  • I'm using this to count the words: val countRdd = yourRdd.map(title => (title, 1)).reduceByKey(_ + _) Can you confirm it's correct. – AngryPanda Apr 23 '15 at 10:55
  • 1
    Yes... because `Iterable` does not have `length`... they have `size`. Changing the answer. – sarveshseri Apr 23 '15 at 11:14
  • Yes... your reduceByKey approach is also Ok. I am giving you number of words per title. So... `( title, count )`. – sarveshseri Apr 23 '15 at 11:16
3

This is how it can be solved using the newer dataframe API. First read the data using "\t" as a delimiter:

val df = spark.read
  .option("delimiter", "\t")
  .option("header", false)
  .csv("s3n://file.txt")
  .toDF("title", "text")

Then, split the text column on space and explode to get one word per row.

val df2 = df.select($"title", explode(split($"text", " ")).as("words"))

Finally, group on the title column and count the number of words for each.

val countDf = df2.groupBy($"title").agg(count($"words"))
Shaido
  • 27,497
  • 23
  • 70
  • 73
0

Another version with DataFrame API

// read into DataFrame
val viewsDF=spark.read.text("s3n://file.txt")

// Split
val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\\t").getItem(0)).withColumn("col2", split($"value", "\\s+").getItem(1)).drop($"value"))

Sample

scala> val viewsDF=spark.read.text("spark-labs/data/wiki-pageviews.txt")
viewsDF: org.apache.spark.sql.DataFrame = [value: string]

scala> viewsDF.printSchema
root
 |-- value: string (nullable = true)


scala> viewsDF.limit(5).show
+------------------+
|             value|
+------------------+
|  aa Main_Page 3 0|
|  aa Main_page 1 0|
|  aa User:Savh 1 0|
|  aa Wikipedia 1 0|
|aa.b User:Savh 1 0|
+------------------+


scala> val splitedViewsDF = viewsDF.withColumn("col1", split($"value", "\\s+").getItem(0)).withColumn("col2", split($"value", "\\s+").getItem(1)).withColumn("col3", split($"value", "\\s+").getItem(2)).drop($"value")
splitedViewsDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]

scala>

scala> splitedViewsDF.printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)


scala> splitedViewsDF.limit(5).show
+----+---------+----+
|col1|     col2|col3|
+----+---------+----+
|  aa|Main_Page|   3|
|  aa|Main_page|   1|
|  aa|User:Savh|   1|
|  aa|Wikipedia|   1|
|aa.b|User:Savh|   1|
+----+---------+----+


scala>
Zhen Zeng
  • 61
  • 7
0

The answer which proved above is not good enough. .map( line => line.split("\t") ) may cause:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 1485, ip-172-31-113-181.us-west-2.compute.internal, executor 10): java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14

in case the last column is empty. the best result explained here - Split 1 column into 3 columns in spark scala

Maor Aharon
  • 312
  • 3
  • 14