3

I have a huge file of rdf triplets (subject predicate objects) as shown in the image below. The goals it extract the bold items and have the following output

  Item_Id | quantityAmount | quantityUnit | rank
    -----------------------------------------------
      Q31      24954         Meter       BestRank
      Q25       582         Kilometer    NormalRank  

enter image description here

I want to extract lines that follow the following pattern

  • subject is given a pointer (<Q31> <prop/P1082> <Pointer_Q31-87RF> .)

  • Pointer has a ranking (<Pointer_Q31-87RF> <rank> <BestRank> )
    and valuePointer (<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> )

  • The valuePointer in turn points to its Amount (<value/cebcf9> <quantityAmount> "24954") and Unit (<value/cebcf9> <quantityUnit> <Meter>)

The normal way is to read the file line by line and extract each one of these above patterns (using sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) and then through different joins combine them such that it would provide the above table. Is there a better way to go after this? I am including the file sample below.

<Q31> <prop/P1082> <Pointer_Q31-87RF> .
<Pointer_Q31-87RF> <rank> <BestRank> .
<Pointer_Q31-87RF> <prop/Pointer_P1082> "+24954"^^<2001/XMLSchema#decimal> .
<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> 24954
<value/cebcf9> <quantityUnit> <Meter> .
<Q25> <prop/P1082> <Pointer_Q25-8E6C> .
<Pointer_Q25-8E6C> <rank> <NormalRank> .
<Pointer_Q25-8E6C> <prop/Pointer_P1082> "+24954”
<Pointer_Q25-8E6C> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> "582" .
<value/cebcf9> <quantityUnit> <Kilometer> .
user1848018
  • 1,086
  • 1
  • 14
  • 33

1 Answers1

4

If you can use \n<Q as the delimiter to create RDD elements, then it becomes a pure python task to parse the data blocks. Below I create a function (based on your sample) to parse the block texts using regexes and retrieve cols information into Row object (you might have to adjust the regexes to reflect the actual data patterns, i.e. case sensitivity, extra white spaces etc.):

  • For each RDD element, split by '\n' (line-mode)
  • and then for each line, split by > < into a list y
  • we can find rank, quantityUnit by checking y[1] and y[2], quantityAmount by checking y[1] and Item_id by checking y[0].
  • Create Row object by iterating all required fields, set value to None for missing fields

    from pyspark.sql import Row
    import re
    
    # skipped the code to initialize SparkSession
    
    # field names to retrieve
    cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']
    
    def parse_rdd_element(x, cols):
        try:
            row = {}
            for e in x.split('\n'):
                y = e.split('> <')
                if len(y) < 2:
                    continue
                if y[1] in ['rank', 'quantityUnit']:
                    row[y[1]] = y[2].split(">")[0]
                else:
                    m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
                    if m:
                        row['quantityAmount'] = m.group(1)
                        continue
                    m = re.match('^(?:<Q)?(\d+)', y[0])
                    if m:
                        row['Item_Id'] = 'Q' + m.group(1)
            # if row is not EMPTY, set None to missing field
            return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
        except:
            return None
    

setup RDD using newAPIHadoopFile() with \n<Q as delimiter:

rdd = spark.sparkContext.newAPIHadoopFile(
    '/path/to/file',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': '\n<Q'}
)

Use the map function to parse the RDD element into Row object

rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]

Convert the above RDD to dataframe

df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit|      rank|
+-------+--------------+------------+----------+
|    Q31|         24954|       Meter|  BestRank|
|    Q25|           582|   Kilometer|NormalRank|
+-------+--------------+------------+----------+

Some Notes:

  • For better performance, pre-compile all regex patterns using re.compile() before passing them to the parse_rdd_element() function.

  • In case there could be spaces/tabs between \n and <Q, multiple blocks will be added into the same RDD element, just split the RDD element by \n\s+<Q and replace map() with flatMap().

Reference: creating spark data structure from multiline record

jxc
  • 13,553
  • 4
  • 16
  • 34
  • I like the approach but as I said earlier in the comments, I have no way to know if the order shown above is always preserved in the large file. – user1848018 Sep 17 '19 at 14:56
  • hi, @user1848018, what do you mean by `order`? can you actually split the text file by using `\n – jxc Sep 17 '19 at 17:15
  • By order I mean, what if Q31> . – user1848018 Sep 17 '19 at 17:23
  • @user1848018, I only saw a missing `<`, what does that mean? or some specific substring must be shown in text matching? BTW. Is it possible that lines related to `` mess up with lines related to ``, if that is the truth, then will need a completely different approach. – jxc Sep 17 '19 at 17:34
  • BTW. as long as you can use a delimiter like `\n, which might be from a missing `<` or extra spaces. we can split the RDD using re.split() later and then run flatMap() – jxc Sep 17 '19 at 17:46