0

We have data of publications and we want to tag them into various categories. We have stored them into HBase and from there we are saving them into sparkrdd and then tag them using scala code. A sample of the HBase data looks like this:

PubEntity:Abstract                  timestamp=1476537886382, value=not found                                                                  
 PubEntity:Affiliations              timestamp=1476537886382, value=[]                                                                         
 PubEntity:Article_Title             timestamp=1476537886382, value=Formate assay in body fluids: application in methanol poisoning.           
 PubEntity:Author                    timestamp=1476537886382, value=[{'LastName': 'Makar', 'ForeName': 'A B', 'author_name': 'A B Makar', 'Init
                                     ials': 'AB', 'author_affiliation': 'not found'}, {'LastName': 'McMartin', 'ForeName': 'K E', 'author_name'
                                     : 'K E McMartin', 'Initials': 'KE', 'author_affiliation': 'not found'}, {'LastName': 'Palese', 'ForeName':
                                      'M', 'author_name': 'M Palese', 'Initials': 'M', 'author_affiliation': 'not found'}, {'LastName': 'Tephly
                                     ', 'ForeName': 'T R', 'author_name': 'T R Tephly', 'Initials': 'TR', 'author_affiliation': 'not found'}]  
 PubEntity:Journal_Title             timestamp=1476537886382, value=Biochemical medicine                                                       
 PubEntity:PMID                      timestamp=1476537886382, value=1                                                                          
 PubRemaining:Countries              timestamp=1476537886382, value=[]                                                                         
 PubRemaining:Created_At             timestamp=1476537886382, value=170812800.0                                                                
 PubRemaining:DOI                    timestamp=1476537886382, value=not found                                                                  
 PubRemaining:Date_Created           timestamp=1476537886382, value=19760116                                                                   
 PubRemaining:ISO_Abbreviation       timestamp=1476537886382, value=Biochem Med                                                                
 PubRemaining:ISSN                   timestamp=1476537886382, value=0006-2944                                                                  
 PubRemaining:Pub_Date               timestamp=1476537886382, value=01 Jun, 1975                                                               
 PubRemaining:Year                   timestamp=1476537886382, value=1975 

We have been able to make a RDD in spark as explained in first answer in

How to read from hbase using spark Here is the code:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]").set("spark.driver.allowMultipleContexts", "true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "name_of_the_database"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName) 

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println(" Number of Records found : " + hBaseRDD.count())
    println(hBaseRDD.first())
    sc.stop()
  }
}

After running this code chunk in Scala I got: defined module HBaseRead Then I did HBaseRead.main(Array()) which outputs the number of records found and reads the first record as :

(31,keyvalues={1/PubEntity:Abstract/1476537886382/Put/vlen=9/mvcc=0, 1/Entity:Affiliations/1476537886382/Put/vlen=2/mvcc=0, 1/Entity:Article_Title/1476537886382/Put/vlen=64/mvcc=0, 1/Entity:Author/1476537886382/Put/vlen=497/mvcc=0, 1/Entity:Journal_Title/1476537886382/Put/vlen=20/mvcc=0, 1/Entity:PMID/1476537886382/Put/vlen=1/mvcc=0, 1/Remaining:Countries/1476537886382/Put/vlen=2/mvcc=0, 1/Remaining:Created_At/1476537886382/Put/vlen=11/mvcc=0, 1/Remaining:DOI/1476537886382/Put/vlen=9/mvcc=0, 1/Remaining:Date_Created/1476537886382/Put/vlen=8/mvcc=0, 1/Remaining:ISO_Abbreviation/1476537886382/Put/vlen=11/mvcc=0, 1/Remaining:ISSN/1476537886382/Put/vlen=9/mvcc=0, 1/Remaining:Pub_Date/1476537886382/Put/vlen=12/mvcc=0, 1/Remaining:Year/1476537886382/Put/vlen=4/mvcc=0})

Now in this output, you would see vlen=12/mvcc=0. On checking the data, I found vlen is the length of each word/number. I couldn't figure out what mvcc is meant for. We want the output to show the words/number instead of vlen=4. Further we want to read these entries and find certain words and phrases in them and tag them accordingly. All in scala.

Any link to any helpful online resource in this regard would be highly appreciated.

Community
  • 1
  • 1
Ravi Ranjan
  • 353
  • 1
  • 6
  • 22

0 Answers0