3

suppose these are my data:

‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS.
‘Map’ is responsible to read data from input location.
it will generate a key value pair.
that is, an intermediate output in local machine.
’Reducer’ is responsible to process the intermediate.
output received from the mapper and generate the final output.

and i want to add a number to every line like below output:

1,‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS.
2,‘Map’ is responsible to read data from input location.
3,it will generate a key value pair.
4,that is, an intermediate output in local machine.
5,’Reducer’ is responsible to process the intermediate.
6,output received from the mapper and generate the final output.

save them to file.

i've tried:

object DS_E5 {
  def main(args: Array[String]): Unit = {

    var i=0
    val conf = new SparkConf().setAppName("prep").setMaster("local")
    val sc = new SparkContext(conf)
    val sample1 = sc.textFile("data.txt")
    for(sample<-sample1){
      i=i+1
      val ss=sample.map(l=>(i,sample))
      println(ss)
    }
 }
}

but its output is like blew :

Vector((1,‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS.))
...

How can i edit my code to generate an output like my favorite output?

AHAD
  • 239
  • 4
  • 16

2 Answers2

6

zipWithIndex is what you need here. It maps from RDD[T] to RDD[(T, Long)] by adding an index on the second position of the pair.

sample1
   .zipWithIndex()
   .map { case (line, i) => i.toString + ", " + line }

or using string interpolation (see a comment by @DanielC.Sobral)

sample1
    .zipWithIndex()
    .map { case (line, i) => s"$i, $line" }
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Might need `i+1` to if counting starts at 1. – jwvh Jul 03 '15 at 19:34
  • thank you @zero323, it is ok , but there are parentheses yet (1,line), i want to remove these parentheses. – AHAD Jul 03 '15 at 19:39
  • I am not sure if I understand. Do you an output to be `RDD[String]`? – zero323 Jul 03 '15 at 19:41
  • this is your code output : (0,‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS.) but i want this output: 0,‘Maps‘ and ‘Reduces‘ are two phases of solving a query in HDFS. – AHAD Jul 03 '15 at 19:46
  • Personally, I prefer string interpolation and wish `+` on strings would disappear. :) – Daniel C. Sobral Jul 03 '15 at 19:55
  • +1 although wouldn't .read() be read by multiple executors so the order of the index set by zipWithIndex() is not guranteed? In other words, yes you get all lines numbered but those indicies aren't necessary stable and depend on how many executors you had, how HDFS file is distributed among them etc? – Tagar Sep 11 '17 at 14:57
  • @Tagar For `textFile` order of partitions and values in partition comes from the input format logic. It is stable and consecutive splits are assigned to consecutive partitions. – zero323 Sep 14 '17 at 11:06
2

By calling val sample1 = sc.textFile("data.txt") you are creating a new RDD.

If you need just an output, you can try to use next code:

sample1.zipWithIndex().foreach(f => println(f._2 + ", " + f._1))

Basically, by using this code, you will do this:

  1. Using .zipWithIndex() will return new RDD[(T, Long)], where (T, Long) is a Tuple, T is a previous RDD elements datatype (java.lang.String, I believe) and Long is an index of element in RDD.
  2. You performed transformation, now you need to make an action. foreach, in this case, suits very well. What is basically does: it applies your statement to every element in current RDD, so we just call quickly formatted println.
Roman Khlebnov
  • 207
  • 3
  • 12