3

Data in my first RDD is like

1253
545553
12344896
1 2 1
1 43 2
1 46 1
1 53 2

Now the first 3 integers are some counters that I need to broadcast. After that all the lines have the same format like

1 2 1
1 43 2

I will map all those values after 3 counters to a new RDD after doing some computation with them in function. But I'm not able to understand how to separate those first 3 values and map the rest normally.

My Python code is like this

documents = sc.textFile("file.txt").map(lambda line: line.split(" "))

final_doc = documents.map(lambda x: (int(x[0]), function1(int(x[1]), int(x[2])))).reduceByKey(lambda x, y: x + " " + y)

It works only when first 3 values are not in the text file but with them it gives error.

I don't want to skip those first 3 values, but store them in 3 broadcast variables and then pass the remaining dataset in map function.

And yes the text file has to be in that format only. I cannot remove those 3 values/counters

Function1 is just doing some computation and returning the values.

Nicky
  • 333
  • 2
  • 4
  • 11
  • Possible duplicate of [How to skip header from csv files in Spark?](http://stackoverflow.com/questions/27854919/how-to-skip-header-from-csv-files-in-spark) – zero323 Sep 30 '15 at 23:56
  • But I don't want to skip, I want to store those 3 values in 3 different variables and then work with all other data in dataset. I don't want to pass those 3 values to the map function I described above. – Nicky Oct 01 '15 at 00:04
  • Load data: `raw = sc.textFile("file.txt")`, Take first three lines you want to use for broadcast: `header = raw.take(3)`, use one of the methods described in the linked answer to skip header and process the rest. – zero323 Oct 01 '15 at 00:08
  • Yeah that is correct . I'll try that, thanks.. – Nicky Oct 01 '15 at 00:11
  • I tried it. but since header contains 3 values so its not working. The methods in the linked answer do not deal with more then one values. – Nicky Oct 01 '15 at 00:55
  • sorry My bad it worked..... – Nicky Oct 01 '15 at 01:30

3 Answers3

5
  1. Imports for Python 2

    from __future__ import print_function
    
  2. Prepare dummy data:

    s = "1253\n545553\n12344896\n1 2 1\n1 43 2\n1 46 1\n1 53 2"
    with open("file.txt", "w") as fw: fw.write(s)
    
  3. Read raw input:

    raw = sc.textFile("file.txt")
    
  4. Extract header:

    header = raw.take(3)
    print(header)
    ### [u'1253', u'545553', u'12344896']
    
  5. Filter lines:

    • using zipWithIndex

      content = raw.zipWithIndex().filter(lambda kv: kv[1] > 2).keys()
      print(content.first())
      ## 1 2 1
      
    • using mapPartitionsWithIndex

      from itertools import islice
      
      content = raw.mapPartitionsWithIndex(
          lambda i, iter: islice(iter, 3, None) if i == 0 else iter)
      
      print(content.first())
      ## 1 2 1
      

NOTE: All credit goes to pzecevic and Sean Owen (see linked sources).

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
3

In my case I have a csv file like below

----- HEADER START -----
We love to generate headers
#who needs comment char?
----- HEADER END -----

colName1,colName2,...,colNameN
val__1.1,val__1.2,...,val__1.N

Took me a day to figure out

val rdd = spark.read.textFile(pathToFile)  .rdd
  .zipWithIndex() // get tuples (line, Index)
  .filter({case (line, index) => index > numberOfLinesToSkip})
  .map({case (line, index) => l}) //get rid of index
val ds = spark.createDataset(rdd) //convert rdd to dataset
val df=spark.read.option("inferSchema", "true").option("header", "true").csv(ds) //parse csv

Sorry code in scala, however can be easily converted to python

Anton
  • 1,432
  • 13
  • 17
0

First take the values using take() method as zero323 suggested

raw  = sc.textfile("file.txt")
headers = raw.take(3)

Then

final_raw = raw.filter(lambda x: x != headers)

and done.

Nicky
  • 333
  • 2
  • 4
  • 11