1

I have multiple xml files that look something like this:

    <?xml version="1.0" encoding="UTF-8"?>
<parent>
  <row AcceptedAnswerId="15" AnswerCount="5" Body="&lt;p&gt;How should 
I elicit prior distributions from experts when fitting a Bayesian 
model?&lt;/p&gt;&#10;" CommentCount="1" CreationDate="2010-07-
19T19:12:12.510" FavoriteCount="17" Id="1" LastActivityDate="2010-09-
15T21:08:26.077" OwnerUserId="8" PostTypeId="1" Score="26" 
Tags="&lt;bayesian&gt;&lt;prior&gt;&lt;elicitation&gt;" 
Title="Eliciting priors from experts" ViewCount="1457" />

I would like to be able to use PySpark to count the lines that DO NOT contain the string: <row

My current thought:

def startWithRow(line):
     if line.strip().startswith("<row"):
         return True
     else:
        return False

sc.textFile(localpath("folder_containing_xmg.gz_files")) \
    .filter(lambda x: not startWithRow(x)) \
    .count()

I have tried validating this, but am getting results from even a simple count lines that don't make sense (I downloaded the xml file and did a wc on it which did not match the word count from PySpark.)

Does anything about my approach above stand out as wrong/weird?

Aus_10
  • 670
  • 7
  • 15
  • Possible duplicate of [How to parse xml files in Apache Spark?](https://stackoverflow.com/questions/33280821/how-to-parse-xml-files-in-apache-spark) – Chiheb Nexus May 26 '17 at 19:28

2 Answers2

0

I will just use lxml library combined with Spark to count the line with row or filter something out.

from lxml import etree

def find_number_of_rows(path):
    try:
        tree = etree.fromstring(path)
    except:
        tree = etree.parse(path)
    return len(tree.findall('row'))

rdd = spark.sparkContext.parallelize(paths) # paths is a list to all your paths
rdd.map(lambda x: find_number_of_rows(x)).collect()

For example, if you have list or XML string (just toy example), you can do the following:

text = [
    """
    <parent>
      <row ViewCount="1457" />
      <row ViewCount="1457" />
    </parent>
    """, 
    """
    <parent>
      <row ViewCount="1457" />
      <row ViewCount="1457" />
      <row ViewCount="1457" />
    </parent>
    """
]

rdd = spark.sparkContext.parallelize(text)
rdd.map(lambda x: find_number_of_rows(x)).collect()

In your case, your function have to take in path to file instead. Then, you can count or filter those rows. I don't have a full file to test on. Let me know if you need extra help!

titipata
  • 5,321
  • 3
  • 35
  • 59
0
def badRowParser(x):    
    try:
        line = ET.fromstring(x.strip().encode('utf-8'))
        return True
    except:
        return False
posts = sc.textFile(localpath('folder_containing_xml.gz_files'))
rejected = posts.filter(lambda l: "<row" in l.encode('utf-
8')).map(lambda x: not badRowParser(x))
ans = rejected.collect()

from collections import Counter
Counter(ans)
Aus_10
  • 670
  • 7
  • 15