-1

I have a python code to implement in Spark, however I am unable to get the logic right for the RDD working to implement in Spark 1.1 version. This code is perfectly working in Python ,but I would like to implement in Spark with this code.

import lxml.etree
import csv

sc = SparkContext
data = sc.textFile("pain001.xml")
rdd = sc.parallelize(data)
# compile xpath selectors for ele ment text
selectors = ('GrpHdr/MsgId', 'GrpHdr/CreDtTm') # etc...
xpath = [lxml.etree.XPath('{}/text()'.format(s)) for s in selectors]

# open result csv file
with open('pain.csv', 'w') as paincsv:
    writer = csv.writer(paincsv)
    # read file with 1 'CstmrCdtTrfInitn' record per line
    with open(rdd) as painxml:
        # process each record
        for index, line in enumerate(painxml):
            if not line.strip(): # allow empty lines
                continue
            try:
                # each line is an xml doc
                pain001 = lxml.etree.fromstring(line)
                # move to the customer elem
                elem = pain001.find('CstmrCdtTrfInitn')
                # select each value and write to csv
                writer.writerow([xp(elem)[0].strip() for xp in xpath])
            except Exception, e:
                # give a hint where things go bad
                sys.stderr.write("Error line {}, {}".format(index, str(e)))
                raise  

I am getting error as RDD not iteratable
  1. I want to implement this code as a function and implement as a standalone program in Spark
  2. I would want the input file to be processed in HDFS as well as local mode in Spark with the python module.

Appreciate responses for the problem.

1 Answers1

1

The error you are getting is very informative, when you do with open(rdd) as painxml: and after that, you try to iterate over the RDD as if it was a normal List or Tuple in python, and an RDD is not iterable, furthermore if you read the textFile documentation, you can notice that it returns an RDD.

I think the problem you have is that you are trying to achieve this in a classic way, and you must approach it inside the MapReduce paradigm, if you are really new into Apache Spark, you can audit this course Scalable Machine Learning with Apache Spark, furthermore I would recommend you to update your spark's version to 1.5 or 1.6 (that will come out soon).

Just as a small example (but not using xmls):

  1. Import the required files

    import re
    import csv
    
  2. Read the input file

    content = sc.textFile("../test")
    content.collect()
    # Out[8]: [u'1st record-1', u'2nd record-2', u'3rd record-3', u'4th record-4']
    
  3. Map the RDD to manipulate each row

    # Map it and convert it to tuples
    rdd = content.map(lambda s: tuple(re.split("-+",s)))
    rdd.collect()
    # Out[9]: [(u'1st record', u'1'),
    #          (u'2nd record', u'2'),
    #          (u'3rd record', u'3'),
    #          (u'4th record', u'4')]
    
  4. Write your data

    with open("../test.csv", "w") as fw:
        writer = csv.writer(fw)
    
        for r1 in rdd.toLocalIterator():
            writer.writerow(r1)
    
  5. Take a look...

    $ cat test.csv
    1st record,1
    2nd record,2
    3rd record,3
    4th record,4
    

Note: If you want to read a xml with Apache Spark, there are some libraries in GitHub like spark-xml; you can also find this question interesting xml processing in spark.

Community
  • 1
  • 1
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93